You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by pr...@apache.org on 2007/09/02 19:01:53 UTC

svn commit: r572034 - in /mina/branches/1.1/core/src: main/java/org/apache/mina/transport/vmpipe/ main/java/org/apache/mina/transport/vmpipe/support/ test/java/org/apache/mina/transport/vmpipe/

Author: proyal
Date: Sun Sep  2 10:01:53 2007
New Revision: 572034

URL: http://svn.apache.org/viewvc?rev=572034&view=rev
Log:
DIRMINA-431 - Test and fixes. Keep track of when the session is opened in VmPipeFC to ensure proper event ordering. Use a re-entrant lock, so we can try and lock, and put in data queue if we can't.

Added:
    mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java   (with props)
Modified:
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=572034&r1=572033&r2=572034&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Sun Sep  2 10:01:53 2007
@@ -88,9 +88,11 @@
         }
 
         DefaultConnectFuture future = new DefaultConnectFuture();
-        VmPipeSessionImpl localSession = new VmPipeSessionImpl(this, config,
-                getListeners(), new Object(), // lock
-                new AnonymousSocketAddress(), handler, entry);
+        VmPipeSessionImpl localSession = new VmPipeSessionImpl(this,
+                                                               config,getListeners(),
+                                                               new AnonymousSocketAddress(),
+                                                               handler,
+                                                               entry);
 
         // initialize connector session
         try {
@@ -100,8 +102,7 @@
             config.getThreadModel().buildFilterChain(filterChain);
 
             // The following sentences don't throw any exceptions.
-            localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
-                    future);
+            localSession.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, future);
             getListeners().fireSessionCreated(localSession);
             VmPipeIdleStatusChecker.getInstance().addSession(localSession);
         } catch (Throwable t) {
@@ -127,6 +128,9 @@
             remoteSession.close();
         }
 
+
+        // Start chains, and then allow and messages read/written to be processed. This is to ensure that
+        // sessionOpened gets received before a messageReceived
         ((VmPipeFilterChain) localSession.getFilterChain()).start();
         ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=572034&r1=572033&r2=572034&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Sun Sep  2 10:01:53 2007
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.vmpipe.support;
 
@@ -36,7 +36,8 @@
 
     private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
 
-    private boolean flushEnabled;
+    private volatile boolean flushEnabled;
+    private volatile boolean sessionOpened;
 
     public VmPipeFilterChain(IoSession session) {
         super(session);
@@ -45,11 +46,12 @@
     public void start() {
         flushEnabled = true;
         flushEvents();
+        flushPendingDataQueues( (VmPipeSessionImpl) getSession() );
     }
 
     private void pushEvent(Event e) {
         eventQueue.offer(e);
-        if (flushEnabled) {
+        if ( flushEnabled ) {
             flushEvents();
         }
     }
@@ -68,10 +70,9 @@
 
         if (type == EventType.RECEIVED) {
             VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-            synchronized (s.lock) {
-                if (!s.getTrafficMask().isReadable()) {
-                    s.pendingDataQueue.offer(data);
-                } else {
+
+            if( sessionOpened && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
+                try {
                     int byteCount = 1;
                     if (data instanceof ByteBuffer) {
                         byteCount = ((ByteBuffer) data).remaining();
@@ -80,7 +81,13 @@
                     s.increaseReadBytes(byteCount);
 
                     super.fireMessageReceived(s, data);
+                } finally {
+                    s.getLock().unlock();
                 }
+
+                flushPendingDataQueues( s );
+            } else {
+                s.pendingDataQueue.add(data);
             }
         } else if (type == EventType.WRITE) {
             super.fireFilterWrite(session, (WriteRequest) data);
@@ -92,6 +99,7 @@
             super.fireSessionIdle(session, (IdleStatus) data);
         } else if (type == EventType.OPENED) {
             super.fireSessionOpened(session);
+            sessionOpened = true;
         } else if (type == EventType.CREATED) {
             super.fireSessionCreated(session);
         } else if (type == EventType.CLOSED) {
@@ -101,6 +109,11 @@
         }
     }
 
+    private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
+        s.updateTrafficMask();
+        s.getRemoteSession().updateTrafficMask();
+    }
+
     @Override
     public void fireFilterClose(IoSession session) {
         pushEvent(new Event(EventType.CLOSE, null));
@@ -149,12 +162,9 @@
     @Override
     protected void doWrite(IoSession session, WriteRequest writeRequest) {
         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-        synchronized (s.lock) {
-            if (s.isConnected()) {
-
-                if (!s.getTrafficMask().isWritable()) {
-                    s.pendingDataQueue.offer(writeRequest);
-                } else {
+        if (s.isConnected()) {
+            if ( s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
+                try {
                     Object message = writeRequest.getMessage();
 
                     int byteCount = 1;
@@ -174,23 +184,34 @@
                     s.increaseWrittenMessages();
 
                     s.getRemoteSession().getFilterChain().fireMessageReceived(
-                            s.getRemoteSession(), messageCopy);
+                        s.getRemoteSession(), messageCopy);
                     s.getFilterChain().fireMessageSent(s, writeRequest);
+                } finally {
+                    s.getLock().unlock();
                 }
+
+                flushPendingDataQueues( s );
             } else {
-                writeRequest.getFuture().setWritten(false);
+                s.pendingDataQueue.add(writeRequest);
             }
+        } else {
+            writeRequest.getFuture().setWritten(false);
         }
     }
 
     @Override
     protected void doClose(IoSession session) {
         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-        synchronized (s.lock) {
+
+        try {
+            s.getLock().lock();
+
             if (!session.getCloseFuture().isClosed()) {
                 s.getServiceListeners().fireSessionDestroyed(s);
                 s.getRemoteSession().close();
             }
+        } finally {
+            s.getLock().unlock();
         }
     }
 
@@ -220,6 +241,7 @@
             this.value = value;
         }
 
+        @Override
         public String toString() {
             return value;
         }
@@ -230,7 +252,7 @@
 
         private final Object data;
 
-        public Event(EventType type, Object data) {
+        private Event(EventType type, Object data) {
             this.type = type;
             this.data = data;
         }

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=572034&r1=572033&r2=572034&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Sun Sep  2 10:01:53 2007
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
@@ -65,20 +67,20 @@
 
     private final VmPipeSessionImpl remoteSession;
 
-    final Object lock;
+    private final Lock lock;
 
     final BlockingQueue<Object> pendingDataQueue;
 
-    /**
+    /*
      * Constructor for client-side session.
      */
-    public VmPipeSessionImpl(IoService service, IoServiceConfig serviceConfig,
-            IoServiceListenerSupport serviceListeners, Object lock,
-            SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry) {
+    public VmPipeSessionImpl( IoService service, IoServiceConfig serviceConfig,
+                              IoServiceListenerSupport serviceListeners,
+                              SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry ) {
         this.service = service;
         this.serviceConfig = serviceConfig;
         this.serviceListeners = serviceListeners;
-        this.lock = lock;
+        this.lock = new ReentrantLock();
         this.localAddress = localAddress;
         this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
         this.handler = handler;
@@ -88,7 +90,7 @@
         remoteSession = new VmPipeSessionImpl(this, remoteEntry);
     }
 
-    /**
+    /*
      * Constructor for server-side session.
      */
     private VmPipeSessionImpl(VmPipeSessionImpl remoteSession, VmPipe entry) {
@@ -188,5 +190,9 @@
                 }
             }
         }
+    }
+
+    Lock getLock() {
+        return lock;
     }
 }

Added: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java?rev=572034&view=auto
==============================================================================
--- mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (added)
+++ mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java Sun Sep  2 10:01:53 2007
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+import junit.framework.TestCase;
+
+/**
+ * @author Apache Mina Project (dev@mina.apache.org)
+ * @version $Rev: $, $Date:  $
+ */
+public class VmPipeSessionCrossCommunicationTest extends TestCase {
+    public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
+        final VmPipeAddress address = new VmPipeAddress( 1 );
+        final IoConnector connector = new VmPipeConnector();
+        final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
+        final CountDownLatch latch = new CountDownLatch( 1 );
+        final CountDownLatch messageCount = new CountDownLatch( 2 );
+        IoAcceptor acceptor = new VmPipeAcceptor();
+
+        acceptor.bind( address, new IoHandlerAdapter() {
+            @Override
+            public void messageReceived( IoSession session, Object message ) throws Exception {
+                System.out.println( Thread.currentThread().getName() + ": " + message );
+
+                if ( "start".equals( message ) ) {
+                    session.write( "open new" );
+                } else if ( "re-use c1".equals( message ) ) {
+                    session.write( "tell me something on c1 now" );
+                } else if ( ( (String) message ).startsWith( "please don't deadlock" ) ) {
+                    messageCount.countDown();
+                } else {
+                    fail( "unexpected message received " + message );
+                }
+            }
+        } );
+
+        connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+
+        ConnectFuture future = connector.connect( address, new IoHandlerAdapter() {
+            @Override
+            public void messageReceived( IoSession session, Object message ) throws Exception {
+                System.out.println( Thread.currentThread().getName() + ": " + message );
+                
+                if ( "open new".equals( message ) ) {
+                    System.out.println( "opening c2 from " + Thread.currentThread().getName() );
+
+                    ConnectFuture c2Future = connector.connect( address, new IoHandlerAdapter() {
+                        @Override
+                        public void sessionOpened( IoSession session ) throws Exception {
+                            session.write( "re-use c1" );
+                        }
+
+                        @Override
+                        public void messageReceived( IoSession session, Object message ) throws Exception {
+                            System.out.println( Thread.currentThread().getName() + ": " + message );
+
+                            if ( "tell me something on c1 now".equals( message ) ) {
+                                latch.countDown();
+                                c1.get().write( "please don't deadlock via c1" );
+                            } else {
+                                fail( "unexpected message received " + message );
+                            }
+                        }
+                    } );
+
+                    c2Future.join();
+
+                    latch.await();
+
+                    c2Future.getSession().write( "please don't deadlock via c2" );
+                } else {
+                    fail( "unexpeced message received " + message );
+                }
+            }
+        } );
+
+        future.join();
+
+        c1.set( future.getSession() );
+        c1.get().write( "start" );
+
+        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+        while ( !messageCount.await( 100, TimeUnit.MILLISECONDS ) ) {
+            long[] threads = threadMXBean.findMonitorDeadlockedThreads();
+
+            if ( null != threads ) {
+                StringBuffer sb = new StringBuffer( 256 );
+                ThreadInfo[] infos = threadMXBean.getThreadInfo( threads, Integer.MAX_VALUE );
+
+                for ( ThreadInfo info : infos ) {
+                    sb.append( info.getThreadName() )
+                        .append( " blocked on " )
+                        .append( info.getLockName() )
+                        .append( " owned by " )
+                        .append( info.getLockOwnerName() )
+                        .append( "\n" );
+                }
+
+                for ( ThreadInfo info : infos ) {
+                    sb.append( "\nStack for " ).append( info.getThreadName() ).append( "\n" );
+                    for ( StackTraceElement element : info.getStackTrace() ) {
+                        sb.append( "\t" ).append( element ).append( "\n" );
+                    }
+                }
+
+                fail( "deadlocked! \n" + sb );
+            }
+        }
+
+        ( (IoAcceptorConfig) acceptor.getDefaultConfig() ).setDisconnectOnUnbind( false );
+        acceptor.unbindAll();
+    }
+}

Propchange: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native