You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by pr...@apache.org on 2007/09/04 06:47:06 UTC

svn commit: r572516 - in /mina/trunk/core/src: main/java/org/apache/mina/transport/vmpipe/ test/java/org/apache/mina/transport/vmpipe/

Author: proyal
Date: Mon Sep  3 21:47:05 2007
New Revision: 572516

URL: http://svn.apache.org/viewvc?rev=572516&view=rev
Log:
port forward r572034 from 1.1 branch. fixes DIRMINA-431

Added:
    mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
      - copied, changed from r572034, mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=572516&r1=572515&r2=572516&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Sep  3 21:47:05 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.vmpipe;
 
@@ -24,22 +24,22 @@
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.mina.common.AbstractIoFilterChain;
 import org.apache.mina.common.AbstractIoConnector;
+import org.apache.mina.common.AbstractIoFilterChain;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatusChecker;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoFuture;
 import org.apache.mina.common.IoFutureListener;
 import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IdleStatusChecker;
 import org.apache.mina.common.TransportMetadata;
 
 /**
  * Connects to {@link IoHandler}s which is bound on the specified
  * {@link VmPipeAddress}.
- * 
+ *
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
@@ -63,7 +63,7 @@
 
     @Override
     protected ConnectFuture doConnect(SocketAddress remoteAddress,
-            SocketAddress localAddress) {
+                                      SocketAddress localAddress) {
         VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
         if (entry == null) {
             return DefaultConnectFuture.newFailedFuture(new IOException(
@@ -92,8 +92,7 @@
             this.getFilterChainBuilder().buildFilterChain(filterChain);
 
             // The following sentences don't throw any exceptions.
-            localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
-                    future);
+            localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, future);
             getListeners().fireSessionCreated(localSession);
             IdleStatusChecker.getInstance().addSession(localSession);
         } catch (Throwable t) {
@@ -116,6 +115,8 @@
             remoteSession.close();
         }
 
+        // Start chains, and then allow and messages read/written to be processed. This is to ensure that
+        // sessionOpened gets received before a messageReceived
         ((VmPipeFilterChain) localSession.getFilterChain()).start();
         ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=572516&r1=572515&r2=572516&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Mon Sep  3 21:47:05 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.vmpipe;
 
@@ -36,7 +36,8 @@
 
     private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
 
-    private boolean flushEnabled;
+    private volatile boolean flushEnabled;
+    private volatile boolean sessionOpened;
 
     VmPipeFilterChain(IoSession session) {
         super(session);
@@ -45,6 +46,7 @@
     public void start() {
         flushEnabled = true;
         flushEvents();
+        flushPendingDataQueues((VmPipeSessionImpl) getSession());
     }
 
     private void pushEvent(Event e) {
@@ -68,19 +70,25 @@
 
         if (type == EventType.RECEIVED) {
             VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-            synchronized (s.lock) {
-                if (!s.getTrafficMask().isReadable()) {
-                    s.pendingDataQueue.offer(data);
-                } else {
-                    int byteCount = 1;
-                    if (data instanceof ByteBuffer) {
-                        byteCount = ((ByteBuffer) data).remaining();
-                    }
+            if (sessionOpened && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
+                try {
+                    if (!s.getTrafficMask().isReadable()) {
+                        s.pendingDataQueue.offer(data);
+                    } else {
+                        int byteCount = 1;
+                        if (data instanceof ByteBuffer) {
+                            byteCount = ((ByteBuffer) data).remaining();
+                        }
 
-                    s.increaseReadBytes(byteCount);
+                        s.increaseReadBytes(byteCount);
 
-                    super.fireMessageReceived(s, data);
+                        super.fireMessageReceived(s, data);
+                    }
+                } finally {
+                    s.getLock().unlock();
                 }
+            } else {
+                s.pendingDataQueue.add(data);
             }
         } else if (type == EventType.WRITE) {
             super.fireFilterWrite(session, (WriteRequest) data);
@@ -92,6 +100,7 @@
             super.fireSessionIdle(session, (IdleStatus) data);
         } else if (type == EventType.OPENED) {
             super.fireSessionOpened(session);
+            sessionOpened = true;
         } else if (type == EventType.CREATED) {
             super.fireSessionCreated(session);
         } else if (type == EventType.CLOSED) {
@@ -101,6 +110,11 @@
         }
     }
 
+    private static void flushPendingDataQueues(VmPipeSessionImpl s) {
+        s.updateTrafficMask();
+        s.getRemoteSession().updateTrafficMask();
+    }
+
     @Override
     public void fireFilterClose(IoSession session) {
         pushEvent(new Event(EventType.CLOSE, null));
@@ -149,12 +163,10 @@
     @Override
     protected void doWrite(IoSession session, WriteRequest writeRequest) {
         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-        synchronized (s.lock) {
-            if (s.isConnected()) {
+        if (s.isConnected()) {
 
-                if (!s.getTrafficMask().isWritable()) {
-                    s.pendingDataQueue.offer(writeRequest);
-                } else {
+            if (s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
+                try {
                     Object message = writeRequest.getMessage();
 
                     int byteCount = 1;
@@ -176,21 +188,30 @@
                     s.getRemoteSession().getFilterChain().fireMessageReceived(
                             s.getRemoteSession(), messageCopy);
                     s.getFilterChain().fireMessageSent(s, writeRequest);
+                } finally {
+                    s.getLock().unlock();
                 }
+
+                flushPendingDataQueues(s);
             } else {
-                writeRequest.getFuture().setWritten(false);
+                s.pendingDataQueue.offer(writeRequest);
             }
+        } else {
+            writeRequest.getFuture().setWritten(false);
         }
     }
 
     @Override
     protected void doClose(IoSession session) {
         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-        synchronized (s.lock) {
+        try {
+            s.getLock().lock();
             if (!session.getCloseFuture().isClosed()) {
                 s.getServiceListeners().fireSessionDestroyed(s);
                 s.getRemoteSession().close();
             }
+        } finally {
+            s.getLock().unlock();
         }
     }
 
@@ -231,7 +252,7 @@
 
         private final Object data;
 
-        public Event(EventType type, Object data) {
+        private Event(EventType type, Object data) {
             this.type = type;
             this.data = data;
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=572516&r1=572515&r2=572516&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Mon Sep  3 21:47:05 2007
@@ -23,6 +23,8 @@
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.DefaultTransportMetadata;
@@ -67,7 +69,7 @@
 
     private final VmPipeSessionImpl remoteSession;
 
-    final Object lock;
+    private final Lock lock;
 
     final BlockingQueue<Object> pendingDataQueue;
 
@@ -79,7 +81,7 @@
                       VmPipeAddress localAddress, IoHandler handler, VmPipe remoteEntry) {
         this.service = service;
         this.serviceListeners = serviceListeners;
-        this.lock = new Object();
+        this.lock = new ReentrantLock();
         this.localAddress = localAddress;
         this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
         this.handler = handler;
@@ -185,5 +187,9 @@
                 }
             }
         }
+    }
+
+    Lock getLock() {
+        return lock;
     }
 }

Copied: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (from r572034, mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java)
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java?p2=mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java&p1=mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java&r1=572034&r2=572516&rev=572516&view=diff
==============================================================================
--- mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java Mon Sep  3 21:47:05 2007
@@ -25,15 +25,13 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import junit.framework.TestCase;
+
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoAcceptorConfig;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ThreadModel;
-
-import junit.framework.TestCase;
 
 /**
  * @author Apache Mina Project (dev@mina.apache.org)
@@ -41,105 +39,110 @@
  */
 public class VmPipeSessionCrossCommunicationTest extends TestCase {
     public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
-        final VmPipeAddress address = new VmPipeAddress( 1 );
+        final VmPipeAddress address = new VmPipeAddress(1);
         final IoConnector connector = new VmPipeConnector();
         final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
-        final CountDownLatch latch = new CountDownLatch( 1 );
-        final CountDownLatch messageCount = new CountDownLatch( 2 );
+        final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch messageCount = new CountDownLatch(2);
         IoAcceptor acceptor = new VmPipeAcceptor();
 
-        acceptor.bind( address, new IoHandlerAdapter() {
+        acceptor.setHandler(new IoHandlerAdapter() {
             @Override
-            public void messageReceived( IoSession session, Object message ) throws Exception {
-                System.out.println( Thread.currentThread().getName() + ": " + message );
+            public void messageReceived(IoSession session, Object message) throws Exception {
+                System.out.println(Thread.currentThread().getName() + ": " + message);
 
-                if ( "start".equals( message ) ) {
-                    session.write( "open new" );
-                } else if ( "re-use c1".equals( message ) ) {
-                    session.write( "tell me something on c1 now" );
-                } else if ( ( (String) message ).startsWith( "please don't deadlock" ) ) {
+                if ("start".equals(message)) {
+                    session.write("open new");
+                } else if ("re-use c1".equals(message)) {
+                    session.write("tell me something on c1 now");
+                } else if (((String) message).startsWith("please don't deadlock")) {
                     messageCount.countDown();
                 } else {
-                    fail( "unexpected message received " + message );
+                    fail("unexpected message received " + message);
                 }
             }
-        } );
+        });
+        acceptor.setLocalAddress(address);
+        acceptor.bind();
 
-        connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
-
-        ConnectFuture future = connector.connect( address, new IoHandlerAdapter() {
+        connector.setHandler(new IoHandlerAdapter() {
             @Override
-            public void messageReceived( IoSession session, Object message ) throws Exception {
-                System.out.println( Thread.currentThread().getName() + ": " + message );
-                
-                if ( "open new".equals( message ) ) {
-                    System.out.println( "opening c2 from " + Thread.currentThread().getName() );
+            public void messageReceived(IoSession session, Object message) throws Exception {
+                System.out.println(Thread.currentThread().getName() + ": " + message);
+
+                if ("open new".equals(message)) {
+                    System.out.println("opening c2 from " + Thread.currentThread().getName());
 
-                    ConnectFuture c2Future = connector.connect( address, new IoHandlerAdapter() {
+                    IoConnector c2 = new VmPipeConnector();
+                    c2.setHandler(new IoHandlerAdapter() {
                         @Override
-                        public void sessionOpened( IoSession session ) throws Exception {
-                            session.write( "re-use c1" );
+                        public void sessionOpened(IoSession session) throws Exception {
+                            session.write("re-use c1");
                         }
 
                         @Override
-                        public void messageReceived( IoSession session, Object message ) throws Exception {
-                            System.out.println( Thread.currentThread().getName() + ": " + message );
+                        public void messageReceived(IoSession session, Object message) throws Exception {
+                            System.out.println(Thread.currentThread().getName() + ": " + message);
 
-                            if ( "tell me something on c1 now".equals( message ) ) {
+                            if ("tell me something on c1 now".equals(message)) {
                                 latch.countDown();
-                                c1.get().write( "please don't deadlock via c1" );
+                                c1.get().write("please don't deadlock via c1");
                             } else {
-                                fail( "unexpected message received " + message );
+                                fail("unexpected message received " + message);
                             }
                         }
-                    } );
+                    });
 
-                    c2Future.join();
+                    ConnectFuture c2Future = c2.connect(address);
+
+                    c2Future.await();
 
                     latch.await();
 
-                    c2Future.getSession().write( "please don't deadlock via c2" );
+                    c2Future.getSession().write("please don't deadlock via c2");
                 } else {
-                    fail( "unexpeced message received " + message );
+                    fail("unexpeced message received " + message);
                 }
             }
-        } );
+        });
+
+        ConnectFuture future = connector.connect(address);
 
-        future.join();
+        future.await();
 
-        c1.set( future.getSession() );
-        c1.get().write( "start" );
+        c1.set(future.getSession());
+        c1.get().write("start");
 
         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
 
-        while ( !messageCount.await( 100, TimeUnit.MILLISECONDS ) ) {
+        while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
             long[] threads = threadMXBean.findMonitorDeadlockedThreads();
 
-            if ( null != threads ) {
-                StringBuffer sb = new StringBuffer( 256 );
-                ThreadInfo[] infos = threadMXBean.getThreadInfo( threads, Integer.MAX_VALUE );
-
-                for ( ThreadInfo info : infos ) {
-                    sb.append( info.getThreadName() )
-                        .append( " blocked on " )
-                        .append( info.getLockName() )
-                        .append( " owned by " )
-                        .append( info.getLockOwnerName() )
-                        .append( "\n" );
+            if (null != threads) {
+                StringBuffer sb = new StringBuffer(256);
+                ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
+
+                for (ThreadInfo info : infos) {
+                    sb.append(info.getThreadName())
+                            .append(" blocked on ")
+                            .append(info.getLockName())
+                            .append(" owned by ")
+                            .append(info.getLockOwnerName())
+                            .append("\n");
                 }
 
-                for ( ThreadInfo info : infos ) {
-                    sb.append( "\nStack for " ).append( info.getThreadName() ).append( "\n" );
-                    for ( StackTraceElement element : info.getStackTrace() ) {
-                        sb.append( "\t" ).append( element ).append( "\n" );
+                for (ThreadInfo info : infos) {
+                    sb.append("\nStack for ").append(info.getThreadName()).append("\n");
+                    for (StackTraceElement element : info.getStackTrace()) {
+                        sb.append("\t").append(element).append("\n");
                     }
                 }
 
-                fail( "deadlocked! \n" + sb );
+                fail("deadlocked! \n" + sb);
             }
         }
 
-        ( (IoAcceptorConfig) acceptor.getDefaultConfig() ).setDisconnectOnUnbind( false );
-        acceptor.unbindAll();
+        acceptor.setDisconnectOnUnbind(false);
+        acceptor.unbind();
     }
 }