You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/01/30 08:05:30 UTC

svn commit: r501329 - in /mina: branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/ branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/ branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/ branches/1.1/c...

Author: trustin
Date: Mon Jan 29 23:05:29 2007
New Revision: 501329

URL: http://svn.apache.org/viewvc?view=rev&rev=501329
Log:
Resolving DIRMINA-306 (Invalid sequence of events with VmPipe Transport)
* Added a test case for the issue
* Changed VmPipeFilterChain to use event queue to order events


Added:
    mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java   (with props)
    mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java   (with props)
    mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java   (with props)
Modified:
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Jan 29 23:05:29 2007
@@ -117,6 +117,7 @@
         catch( Throwable t )
         {
             future.setException( t );
+            return future;
         }
         
         // initialize acceptor session

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Mon Jan 29 23:05:29 2007
@@ -20,46 +20,151 @@
 package org.apache.mina.transport.vmpipe.support;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
 
+import edu.emory.mathcs.backport.java.util.Queue;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
 public class VmPipeFilterChain extends AbstractIoFilterChain {
 
+    private final Queue eventQueue = new ConcurrentLinkedQueue();
+    
     public VmPipeFilterChain( IoSession session )
     {
         super( session );
     }
-
-    public void fireMessageReceived( IoSession session, Object message )
+    
+    private void pushEvent( Event e )
     {
-        VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
-        synchronized( s.lock )
+        eventQueue.offer( e );
+        flushEvents();
+    }
+    
+    private void flushEvents()
+    {
+        Event e;
+        while( ( e = ( Event ) eventQueue.poll() ) != null )
         {
-            if( !s.getTrafficMask().isReadable() )
+            fireEvent( e );
+        }
+    }
+    
+    private void fireEvent( Event e )
+    {
+        IoSession session = getSession();
+        EventType type = e.getType();
+        Object data = e.getData();
+
+        if( type == EventType.RECEIVED )
+        {
+            VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
+            synchronized( s.lock )
             {
-                synchronized( s.pendingDataQueue )
+                if( !s.getTrafficMask().isReadable() )
                 {
-                    s.pendingDataQueue.push( message );
+                    synchronized( s.pendingDataQueue )
+                    {
+                        s.pendingDataQueue.push( data );
+                    }
                 }
-            }
-            else
-            {
-                int byteCount = 1;
-                if( message instanceof ByteBuffer )
+                else
                 {
-                    byteCount = ( ( ByteBuffer ) message ).remaining();
+                    int byteCount = 1;
+                    if( data instanceof ByteBuffer )
+                    {
+                        byteCount = ( ( ByteBuffer ) data ).remaining();
+                    }
+                    
+                    s.increaseReadBytes( byteCount );
+                    
+                    super.fireMessageReceived( s, data );
                 }
-                
-                s.increaseReadBytes( byteCount );
-                
-                super.fireMessageReceived( s, message );
             }
         }
+        else if( type == EventType.WRITE )
+        {
+            super.fireFilterWrite( session, ( WriteRequest ) data );
+        }
+        else if( type == EventType.SENT )
+        {
+            super.fireMessageSent( session, ( WriteRequest ) data );
+        }
+        else if( type == EventType.EXCEPTION )
+        {
+            super.fireExceptionCaught( session, ( Throwable ) data );
+        }
+        else if( type == EventType.IDLE )
+        {
+            super.fireSessionIdle( session, ( IdleStatus ) data );
+        }
+        else if( type == EventType.OPENED )
+        {
+            super.fireSessionOpened( session );
+        }
+        else if( type == EventType.CREATED )
+        {
+            super.fireSessionCreated( session );
+        }
+        else if( type == EventType.CLOSED )
+        {
+            super.fireSessionClosed( session );
+        }
+        else if( type == EventType.CLOSE )
+        {
+            super.fireFilterClose( session );
+        }
+    }
+
+    public void fireFilterClose( IoSession session )
+    {
+        pushEvent( new Event( EventType.CLOSE, null ) );
+    }
+
+    public void fireFilterWrite( IoSession session, WriteRequest writeRequest )
+    {
+        pushEvent( new Event( EventType.WRITE, writeRequest ) );
+    }
+
+    public void fireExceptionCaught( IoSession session, Throwable cause )
+    {
+        pushEvent( new Event( EventType.EXCEPTION, cause ) );
+    }
+
+    public void fireMessageSent( IoSession session, WriteRequest request )
+    {
+        pushEvent( new Event( EventType.SENT, request ) );
+    }
+
+    public void fireSessionClosed( IoSession session )
+    {
+        pushEvent( new Event( EventType.CLOSED, null ) );
+    }
+
+    public void fireSessionCreated( IoSession session )
+    {
+        pushEvent( new Event( EventType.CREATED, null ) );
+    }
+
+    public void fireSessionIdle( IoSession session, IdleStatus status )
+    {
+        pushEvent( new Event( EventType.IDLE, status ) );
+    }
+
+    public void fireSessionOpened( IoSession session )
+    {
+        pushEvent( new Event( EventType.OPENED, null ) );
+    }
+
+    public void fireMessageReceived( IoSession session, Object message )
+    {
+        pushEvent( new Event( EventType.RECEIVED, message ) );
     }
 
     protected void doWrite( IoSession session, WriteRequest writeRequest )
@@ -79,7 +184,6 @@
                 }
                 else
                 {
-                
                     Object message = writeRequest.getMessage();
                     
                     int byteCount = 1;
@@ -99,9 +203,9 @@
                     s.increaseWrittenBytes( byteCount );
                     s.increaseWrittenMessages();
     
-                    s.getFilterChain().fireMessageSent( s, writeRequest );
                     s.getRemoteSession().getFilterChain()
-                                .fireMessageReceived( s.getRemoteSession(), messageCopy );
+                            .fireMessageReceived( s.getRemoteSession(), messageCopy );
+                    s.getFilterChain().fireMessageSent( s, writeRequest );
                 }
             }
             else 
@@ -124,4 +228,52 @@
         }
     }
     
+    // FIXME Copied and pasted from {@link ExecutorFilter}.
+    private static class EventType
+    {
+        public static final EventType CREATED = new EventType( "CREATED" );
+        public static final EventType OPENED = new EventType( "OPENED" );
+        public static final EventType CLOSED = new EventType( "CLOSED" );
+        public static final EventType RECEIVED = new EventType( "RECEIVED" );
+        public static final EventType SENT = new EventType( "SENT" );
+        public static final EventType IDLE = new EventType( "IDLE" );
+        public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+        
+        public static final EventType WRITE = new EventType( "WRITE" );
+        public static final EventType CLOSE = new EventType( "CLOSE" );
+
+        private final String value;
+
+        private EventType( String value )
+        {
+            this.value = value;
+        }
+
+        public String toString()
+        {
+            return value;
+        }
+    }
+
+    private static class Event
+    {
+        private final EventType type;
+        private final Object data;
+
+        public Event( EventType type, Object data )
+        {
+            this.type = type;
+            this.data = data;
+        }
+
+        public Object getData()
+        {
+            return data;
+        }
+
+        public EventType getType()
+        {
+            return type;
+        }
+    }
 }

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Mon Jan 29 23:05:29 2007
@@ -184,11 +184,17 @@
             {
                 if( data[ i ] instanceof WriteRequest )
                 {
+                    // TODO Optimize unefficient data transfer.
+                    // Data will be returned to pendingDataQueue
+                    // if getTraffic().isWritable() is false.
                     WriteRequest wr = ( WriteRequest ) data[ i ];
                     filterChain.doWrite( this, wr );
                 }
                 else
                 {
+                    // TODO Optimize unefficient data transfer.
+                    // Data will be returned to pendingDataQueue
+                    // if getTraffic().isReadable() is false.
                     filterChain.fireMessageReceived( this, data[ i ] );
                 }
             }

Added: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?view=auto&rev=501329
==============================================================================
--- mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (added)
+++ mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java Mon Jan 29 23:05:29 2007
@@ -0,0 +1,159 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.vmpipe;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+/**
+ * Makes sure if the order of event is correct.
+ * 
+ * @author The Apache MINA Project Team (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeEventOrderTest extends TestCase
+{
+    public void testServerToClient() throws Exception
+    {
+        IoAcceptor acceptor = new VmPipeAcceptor();
+        acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+        IoConnector connector = new VmPipeConnector();
+        connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+        
+        acceptor.bind(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+                    public void sessionOpened( IoSession session ) throws Exception
+                    {
+                        session.write("B");
+                    }
+                    
+                    public void messageSent( IoSession session, Object message ) throws Exception
+                    {
+                        session.close();
+                    }
+                });
+        
+        final StringBuffer actual = new StringBuffer();
+        
+        ConnectFuture future = connector.connect(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+
+                    public void messageReceived( IoSession session, Object message ) throws Exception
+                    {
+                        actual.append( message );
+                    }
+
+                    public void sessionClosed( IoSession session ) throws Exception
+                    {
+                        actual.append( "C" );
+                    }
+
+                    public void sessionOpened( IoSession session ) throws Exception {
+                        actual.append( "A" );
+                    }
+                    
+                });
+
+        future.join();
+        future.getSession().getCloseFuture().join();
+        acceptor.unbindAll();
+        
+        // sessionClosed() might not be invoked yet
+        // even if the connection is closed.
+        while( actual.indexOf("C") < 0 )
+        {
+            Thread.yield();
+        }
+        
+        Assert.assertEquals( "ABC", actual.toString() );
+    }
+
+    public void testClientToServer() throws Exception
+    {
+        IoAcceptor acceptor = new VmPipeAcceptor();
+        acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+        IoConnector connector = new VmPipeConnector();
+        connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+        
+        final StringBuffer actual = new StringBuffer();
+        
+        acceptor.bind(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+
+                    public void messageReceived( IoSession session, Object message ) throws Exception
+                    {
+                        actual.append( message );
+                    }
+
+                    public void sessionClosed( IoSession session ) throws Exception
+                    {
+                        actual.append( "C" );
+                    }
+
+                    public void sessionOpened( IoSession session ) throws Exception {
+                        actual.append( "A" );
+                    }
+                    
+                });
+        
+        ConnectFuture future = connector.connect(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+                    public void sessionOpened( IoSession session ) throws Exception
+                    {
+                        session.write("B");
+                    }
+                    
+                    public void messageSent( IoSession session, Object message ) throws Exception
+                    {
+                        session.close();
+                    }
+                });
+
+        future.join();
+        future.getSession().getCloseFuture().join();
+        acceptor.unbindAll();
+        
+        // sessionClosed() might not be invoked yet
+        // even if the connection is closed.
+        while( actual.indexOf("C") < 0 )
+        {
+            Thread.yield();
+        }
+        
+        Assert.assertEquals( "ABC", actual.toString() );
+    }
+}

Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Jan 29 23:05:29 2007
@@ -117,6 +117,7 @@
         catch( Throwable t )
         {
             future.setException( t );
+            return future;
         }
 
         // initialize acceptor session

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Mon Jan 29 23:05:29 2007
@@ -6,20 +6,24 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *
+ *  
  *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ *  
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.
- *
+ *  under the License. 
+ *  
  */
 package org.apache.mina.transport.vmpipe.support;
 
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
@@ -30,34 +34,143 @@
  */
 public class VmPipeFilterChain extends AbstractIoFilterChain {
 
+    private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
+    
     public VmPipeFilterChain( IoSession session )
     {
         super( session );
     }
-
-    @Override
-    public void fireMessageReceived( IoSession session, Object message )
+    
+    private void pushEvent( Event e )
     {
-        VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
-        synchronized( s.lock )
+        eventQueue.offer( e );
+        flushEvents();
+    }
+    
+    private void flushEvents()
+    {
+        Event e;
+        while( ( e = eventQueue.poll() ) != null )
         {
-            if( !s.getTrafficMask().isReadable() )
-            {
-                s.pendingDataQueue.add( message );
-            }
-            else
+            fireEvent( e );
+        }
+    }
+    
+    private void fireEvent( Event e )
+    {
+        IoSession session = getSession();
+        EventType type = e.getType();
+        Object data = e.getData();
+
+        if( type == EventType.RECEIVED )
+        {
+            VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
+            synchronized( s.lock )
             {
-                int byteCount = 1;
-                if( message instanceof ByteBuffer )
+                if( !s.getTrafficMask().isReadable() )
                 {
-                    byteCount = ( ( ByteBuffer ) message ).remaining();
+                    s.pendingDataQueue.offer( data );
+                }
+                else
+                {
+                    int byteCount = 1;
+                    if( data instanceof ByteBuffer )
+                    {
+                        byteCount = ( ( ByteBuffer ) data ).remaining();
+                    }
+                    
+                    s.increaseReadBytes( byteCount );
+                    
+                    super.fireMessageReceived( s, data );
                 }
-
-                s.increaseReadBytes( byteCount );
-
-                super.fireMessageReceived( s, message );
             }
         }
+        else if( type == EventType.WRITE )
+        {
+            super.fireFilterWrite( session, ( WriteRequest ) data );
+        }
+        else if( type == EventType.SENT )
+        {
+            super.fireMessageSent( session, ( WriteRequest ) data );
+        }
+        else if( type == EventType.EXCEPTION )
+        {
+            super.fireExceptionCaught( session, ( Throwable ) data );
+        }
+        else if( type == EventType.IDLE )
+        {
+            super.fireSessionIdle( session, ( IdleStatus ) data );
+        }
+        else if( type == EventType.OPENED )
+        {
+            super.fireSessionOpened( session );
+        }
+        else if( type == EventType.CREATED )
+        {
+            super.fireSessionCreated( session );
+        }
+        else if( type == EventType.CLOSED )
+        {
+            super.fireSessionClosed( session );
+        }
+        else if( type == EventType.CLOSE )
+        {
+            super.fireFilterClose( session );
+        }
+    }
+
+    @Override
+    public void fireFilterClose( IoSession session )
+    {
+        pushEvent( new Event( EventType.CLOSE, null ) );
+    }
+
+    @Override
+    public void fireFilterWrite( IoSession session, WriteRequest writeRequest )
+    {
+        pushEvent( new Event( EventType.WRITE, writeRequest ) );
+    }
+
+    @Override
+    public void fireExceptionCaught( IoSession session, Throwable cause )
+    {
+        pushEvent( new Event( EventType.EXCEPTION, cause ) );
+    }
+
+    @Override
+    public void fireMessageSent( IoSession session, WriteRequest request )
+    {
+        pushEvent( new Event( EventType.SENT, request ) );
+    }
+
+    @Override
+    public void fireSessionClosed( IoSession session )
+    {
+        pushEvent( new Event( EventType.CLOSED, null ) );
+    }
+
+    @Override
+    public void fireSessionCreated( IoSession session )
+    {
+        pushEvent( new Event( EventType.CREATED, null ) );
+    }
+
+    @Override
+    public void fireSessionIdle( IoSession session, IdleStatus status )
+    {
+        pushEvent( new Event( EventType.IDLE, status ) );
+    }
+
+    @Override
+    public void fireSessionOpened( IoSession session )
+    {
+        pushEvent( new Event( EventType.OPENED, null ) );
+    }
+
+    @Override
+    public void fireMessageReceived( IoSession session, Object message )
+    {
+        pushEvent( new Event( EventType.RECEIVED, message ) );
     }
 
     @Override
@@ -68,15 +181,15 @@
         {
             if( s.isConnected() )
             {
+                
                 if( !s.getTrafficMask().isWritable() )
                 {
-                    s.pendingDataQueue.add( writeRequest );
+                    s.pendingDataQueue.offer( writeRequest );
                 }
                 else
                 {
-
                     Object message = writeRequest.getMessage();
-
+                    
                     int byteCount = 1;
                     Object messageCopy = message;
                     if( message instanceof ByteBuffer )
@@ -90,16 +203,16 @@
                         rb.reset();
                         messageCopy = wb;
                     }
-
+                    
                     s.increaseWrittenBytes( byteCount );
                     s.increaseWrittenMessages();
-
-                    s.getFilterChain().fireMessageSent( s, writeRequest );
+    
                     s.getRemoteSession().getFilterChain()
-                                .fireMessageReceived( s.getRemoteSession(), messageCopy );
+                            .fireMessageReceived( s.getRemoteSession(), messageCopy );
+                    s.getFilterChain().fireMessageSent( s, writeRequest );
                 }
             }
-            else
+            else 
             {
                 writeRequest.getFuture().setWritten( false );
             }
@@ -119,5 +232,53 @@
             }
         }
     }
+    
+    // FIXME Copied and pasted from {@link ExecutorFilter}.
+    private static class EventType
+    {
+        public static final EventType CREATED = new EventType( "CREATED" );
+        public static final EventType OPENED = new EventType( "OPENED" );
+        public static final EventType CLOSED = new EventType( "CLOSED" );
+        public static final EventType RECEIVED = new EventType( "RECEIVED" );
+        public static final EventType SENT = new EventType( "SENT" );
+        public static final EventType IDLE = new EventType( "IDLE" );
+        public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+        
+        public static final EventType WRITE = new EventType( "WRITE" );
+        public static final EventType CLOSE = new EventType( "CLOSE" );
+
+        private final String value;
 
+        private EventType( String value )
+        {
+            this.value = value;
+        }
+
+        public String toString()
+        {
+            return value;
+        }
+    }
+
+    private static class Event
+    {
+        private final EventType type;
+        private final Object data;
+
+        public Event( EventType type, Object data )
+        {
+            this.type = type;
+            this.data = data;
+        }
+
+        public Object getData()
+        {
+            return data;
+        }
+
+        public EventType getType()
+        {
+            return type;
+        }
+    }
 }

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Mon Jan 29 23:05:29 2007
@@ -189,11 +189,17 @@
             {
                 if( aData instanceof WriteRequest )
                 {
+                    // TODO Optimize unefficient data transfer.
+                    // Data will be returned to pendingDataQueue
+                    // if getTraffic().isWritable() is false.
                     WriteRequest wr = ( WriteRequest ) aData;
                     filterChain.doWrite( this, wr );
                 }
                 else
                 {
+                    // TODO Optimize unefficient data transfer.
+                    // Data will be returned to pendingDataQueue
+                    // if getTraffic().isReadable() is false.
                     filterChain.fireMessageReceived( this, aData );
                 }
             }

Added: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?view=auto&rev=501329
==============================================================================
--- mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (added)
+++ mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java Mon Jan 29 23:05:29 2007
@@ -0,0 +1,159 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.vmpipe;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+/**
+ * Makes sure if the order of event is correct.
+ * 
+ * @author The Apache MINA Project Team (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeEventOrderTest extends TestCase
+{
+    public void testServerToClient() throws Exception
+    {
+        IoAcceptor acceptor = new VmPipeAcceptor();
+        acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+        IoConnector connector = new VmPipeConnector();
+        connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+        
+        acceptor.bind(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+                    public void sessionOpened( IoSession session ) throws Exception
+                    {
+                        session.write("B");
+                    }
+                    
+                    public void messageSent( IoSession session, Object message ) throws Exception
+                    {
+                        session.close();
+                    }
+                });
+        
+        final StringBuffer actual = new StringBuffer();
+        
+        ConnectFuture future = connector.connect(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+
+                    public void messageReceived( IoSession session, Object message ) throws Exception
+                    {
+                        actual.append( message );
+                    }
+
+                    public void sessionClosed( IoSession session ) throws Exception
+                    {
+                        actual.append( "C" );
+                    }
+
+                    public void sessionOpened( IoSession session ) throws Exception {
+                        actual.append( "A" );
+                    }
+                    
+                });
+
+        future.join();
+        future.getSession().getCloseFuture().join();
+        acceptor.unbindAll();
+        
+        // sessionClosed() might not be invoked yet
+        // even if the connection is closed.
+        while( actual.indexOf("C") < 0 )
+        {
+            Thread.yield();
+        }
+        
+        Assert.assertEquals( "ABC", actual.toString() );
+    }
+
+    public void testClientToServer() throws Exception
+    {
+        IoAcceptor acceptor = new VmPipeAcceptor();
+        acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+        IoConnector connector = new VmPipeConnector();
+        connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+        //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+        
+        final StringBuffer actual = new StringBuffer();
+        
+        acceptor.bind(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+
+                    public void messageReceived( IoSession session, Object message ) throws Exception
+                    {
+                        actual.append( message );
+                    }
+
+                    public void sessionClosed( IoSession session ) throws Exception
+                    {
+                        actual.append( "C" );
+                    }
+
+                    public void sessionOpened( IoSession session ) throws Exception {
+                        actual.append( "A" );
+                    }
+                    
+                });
+        
+        ConnectFuture future = connector.connect(
+                new VmPipeAddress( 1 ),
+                new IoHandlerAdapter() {
+                    public void sessionOpened( IoSession session ) throws Exception
+                    {
+                        session.write("B");
+                    }
+                    
+                    public void messageSent( IoSession session, Object message ) throws Exception
+                    {
+                        session.close();
+                    }
+                });
+
+        future.join();
+        future.getSession().getCloseFuture().join();
+        acceptor.unbindAll();
+        
+        // sessionClosed() might not be invoked yet
+        // even if the connection is closed.
+        while( actual.indexOf("C") < 0 )
+        {
+            Thread.yield();
+        }
+        
+        Assert.assertEquals( "ABC", actual.toString() );
+    }
+}

Propchange: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Jan 29 23:05:29 2007
@@ -90,6 +90,7 @@
         catch( Throwable t )
         {
             future.setException( t );
+            return future;
         }
         
         // initialize acceptor session

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Mon Jan 29 23:05:29 2007
@@ -19,49 +19,161 @@
  */
 package org.apache.mina.transport.vmpipe.support;
 
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
 
 /**
- * @author The Apache MINA Project (dev@mina.apache.org)
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
 public class VmPipeFilterChain extends AbstractIoFilterChain {
 
+    private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
+    
     public VmPipeFilterChain( IoSession session )
     {
         super( session );
     }
-
-    public void fireMessageReceived( IoSession session, Object message )
+    
+    private void pushEvent( Event e )
     {
-        VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
-        synchronized( s.lock )
+        eventQueue.offer( e );
+        flushEvents();
+    }
+    
+    private void flushEvents()
+    {
+        Event e;
+        while( ( e = eventQueue.poll() ) != null )
+        {
+            fireEvent( e );
+        }
+    }
+    
+    private void fireEvent( Event e )
+    {
+        IoSession session = getSession();
+        EventType type = e.getType();
+        Object data = e.getData();
+
+        if( type == EventType.RECEIVED )
         {
-            if( !s.getTrafficMask().isReadable() )
+            VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
+            synchronized( s.lock )
             {
-                synchronized( s.pendingDataQueue )
+                if( !s.getTrafficMask().isReadable() )
                 {
-                    s.pendingDataQueue.offer( message );
+                    s.pendingDataQueue.offer( data );
                 }
-            }
-            else
-            {
-                int byteCount = 1;
-                if( message instanceof ByteBuffer )
+                else
                 {
-                    byteCount = ( ( ByteBuffer ) message ).remaining();
+                    int byteCount = 1;
+                    if( data instanceof ByteBuffer )
+                    {
+                        byteCount = ( ( ByteBuffer ) data ).remaining();
+                    }
+                    
+                    s.increaseReadBytes( byteCount );
+                    
+                    super.fireMessageReceived( s, data );
                 }
-                
-                s.increaseReadBytes( byteCount );
-                
-                super.fireMessageReceived( s, message );
             }
         }
+        else if( type == EventType.WRITE )
+        {
+            super.fireFilterWrite( session, ( WriteRequest ) data );
+        }
+        else if( type == EventType.SENT )
+        {
+            super.fireMessageSent( session, ( WriteRequest ) data );
+        }
+        else if( type == EventType.EXCEPTION )
+        {
+            super.fireExceptionCaught( session, ( Throwable ) data );
+        }
+        else if( type == EventType.IDLE )
+        {
+            super.fireSessionIdle( session, ( IdleStatus ) data );
+        }
+        else if( type == EventType.OPENED )
+        {
+            super.fireSessionOpened( session );
+        }
+        else if( type == EventType.CREATED )
+        {
+            super.fireSessionCreated( session );
+        }
+        else if( type == EventType.CLOSED )
+        {
+            super.fireSessionClosed( session );
+        }
+        else if( type == EventType.CLOSE )
+        {
+            super.fireFilterClose( session );
+        }
+    }
+
+    @Override
+    public void fireFilterClose( IoSession session )
+    {
+        pushEvent( new Event( EventType.CLOSE, null ) );
+    }
+
+    @Override
+    public void fireFilterWrite( IoSession session, WriteRequest writeRequest )
+    {
+        pushEvent( new Event( EventType.WRITE, writeRequest ) );
     }
 
+    @Override
+    public void fireExceptionCaught( IoSession session, Throwable cause )
+    {
+        pushEvent( new Event( EventType.EXCEPTION, cause ) );
+    }
+
+    @Override
+    public void fireMessageSent( IoSession session, WriteRequest request )
+    {
+        pushEvent( new Event( EventType.SENT, request ) );
+    }
+
+    @Override
+    public void fireSessionClosed( IoSession session )
+    {
+        pushEvent( new Event( EventType.CLOSED, null ) );
+    }
+
+    @Override
+    public void fireSessionCreated( IoSession session )
+    {
+        pushEvent( new Event( EventType.CREATED, null ) );
+    }
+
+    @Override
+    public void fireSessionIdle( IoSession session, IdleStatus status )
+    {
+        pushEvent( new Event( EventType.IDLE, status ) );
+    }
+
+    @Override
+    public void fireSessionOpened( IoSession session )
+    {
+        pushEvent( new Event( EventType.OPENED, null ) );
+    }
+
+    @Override
+    public void fireMessageReceived( IoSession session, Object message )
+    {
+        pushEvent( new Event( EventType.RECEIVED, message ) );
+    }
+
+    @Override
     protected void doWrite( IoSession session, WriteRequest writeRequest )
     {
         VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -72,14 +184,10 @@
                 
                 if( !s.getTrafficMask().isWritable() )
                 {
-                    synchronized( s.pendingDataQueue )
-                    {
-                        s.pendingDataQueue.offer( writeRequest );
-                    }
+                    s.pendingDataQueue.offer( writeRequest );
                 }
                 else
                 {
-                
                     Object message = writeRequest.getMessage();
                     
                     int byteCount = 1;
@@ -99,9 +207,9 @@
                     s.increaseWrittenBytes( byteCount );
                     s.increaseWrittenMessages();
     
-                    s.getFilterChain().fireMessageSent( s, writeRequest );
                     s.getRemoteSession().getFilterChain()
-                                .fireMessageReceived( s.getRemoteSession(), messageCopy );
+                            .fireMessageReceived( s.getRemoteSession(), messageCopy );
+                    s.getFilterChain().fireMessageSent( s, writeRequest );
                 }
             }
             else 
@@ -111,6 +219,7 @@
         }
     }
 
+    @Override
     protected void doClose( IoSession session )
     {
         VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -124,4 +233,52 @@
         }
     }
     
+    // FIXME Copied and pasted from {@link ExecutorFilter}.
+    private static class EventType
+    {
+        public static final EventType CREATED = new EventType( "CREATED" );
+        public static final EventType OPENED = new EventType( "OPENED" );
+        public static final EventType CLOSED = new EventType( "CLOSED" );
+        public static final EventType RECEIVED = new EventType( "RECEIVED" );
+        public static final EventType SENT = new EventType( "SENT" );
+        public static final EventType IDLE = new EventType( "IDLE" );
+        public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+        
+        public static final EventType WRITE = new EventType( "WRITE" );
+        public static final EventType CLOSE = new EventType( "CLOSE" );
+
+        private final String value;
+
+        private EventType( String value )
+        {
+            this.value = value;
+        }
+
+        public String toString()
+        {
+            return value;
+        }
+    }
+
+    private static class Event
+    {
+        private final EventType type;
+        private final Object data;
+
+        public Event( EventType type, Object data )
+        {
+            this.type = type;
+            this.data = data;
+        }
+
+        public Object getData()
+        {
+            return data;
+        }
+
+        public EventType getType()
+        {
+            return type;
+        }
+    }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Mon Jan 29 23:05:29 2007
@@ -6,22 +6,24 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.vmpipe.support;
 
 import java.net.SocketAddress;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
@@ -36,14 +38,16 @@
 
 /**
  * A {@link IoSession} for in-VM transport (VM_PIPE).
- * 
- * @author The Apache MINA Project (dev@mina.apache.org)
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
 public class VmPipeSessionImpl extends BaseIoSession
 {
-    private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
-    
+    private static final IoSessionConfig CONFIG = new BaseIoSessionConfig()
+    {
+    };
+
     private final IoService service;
     private final IoServiceListenerSupport serviceListeners;
     private final SocketAddress localAddress;
@@ -53,7 +57,7 @@
     private final VmPipeFilterChain filterChain;
     private final VmPipeSessionImpl remoteSession;
     final Object lock;
-    final Queue<Object> pendingDataQueue;
+    final BlockingQueue<Object> pendingDataQueue;
 
     /**
      * Constructor for client-side session.
@@ -70,7 +74,7 @@
         this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
         this.handler = handler;
         this.filterChain = new VmPipeFilterChain( this );
-        this.pendingDataQueue = new LinkedList<Object>();
+        this.pendingDataQueue = new LinkedBlockingQueue<Object>();
 
         remoteSession = new VmPipeSessionImpl( this, remoteEntry );
     }
@@ -88,19 +92,19 @@
         this.handler = entry.getHandler();
         this.filterChain = new VmPipeFilterChain( this );
         this.remoteSession = remoteSession;
-        this.pendingDataQueue = new LinkedList<Object>();
+        this.pendingDataQueue = new LinkedBlockingQueue<Object>();
     }
-    
+
     public IoService getService()
     {
         return service;
     }
-    
+
     IoServiceListenerSupport getServiceListeners()
     {
         return serviceListeners;
     }
-    
+
     public IoSessionConfig getConfig()
     {
         return CONFIG;
@@ -110,7 +114,7 @@
     {
         return filterChain;
     }
-    
+
     public VmPipeSessionImpl getRemoteSession()
     {
         return remoteSession;
@@ -121,11 +125,13 @@
         return handler;
     }
 
+    @Override
     protected void close0()
     {
         filterChain.fireFilterClose( this );
     }
-    
+
+    @Override
     protected void write0( WriteRequest writeRequest )
     {
         this.filterChain.fireFilterWrite( this, writeRequest );
@@ -140,7 +146,7 @@
     {
         return 0;
     }
-    
+
     public TransportType getTransportType()
     {
         return TransportType.VM_PIPE;
@@ -155,33 +161,37 @@
     {
         return localAddress;
     }
-    
+
     public SocketAddress getServiceAddress()
     {
         return serviceAddress;
     }
 
+    @Override
     protected void updateTrafficMask()
     {
-        if( getTrafficMask().isReadable() || getTrafficMask().isWritable())
+        if( getTrafficMask().isReadable() || getTrafficMask().isWritable() )
         {
-            Object[] data;
-            synchronized( pendingDataQueue )
-            {
-                data = pendingDataQueue.toArray();
-                pendingDataQueue.clear();
-            }
-            
-            for( int i = 0; i < data.length; i++ )
+            List<Object> data = new ArrayList<Object>();
+
+            pendingDataQueue.drainTo( data );
+
+            for( Object aData : data )
             {
-                if( data[ i ] instanceof WriteRequest )
+                if( aData instanceof WriteRequest )
                 {
-                    WriteRequest wr = ( WriteRequest ) data[ i ];
+                    // TODO Optimize unefficient data transfer.
+                    // Data will be returned to pendingDataQueue
+                    // if getTraffic().isWritable() is false.
+                    WriteRequest wr = ( WriteRequest ) aData;
                     filterChain.doWrite( this, wr );
                 }
                 else
                 {
-                    filterChain.fireMessageReceived( this, data[ i ] );
+                    // TODO Optimize unefficient data transfer.
+                    // Data will be returned to pendingDataQueue
+                    // if getTraffic().isReadable() is false.
+                    filterChain.fireMessageReceived( this, aData );
                 }
             }
         }

Added: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?view=auto&rev=501329
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (added)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java Mon Jan 29 23:05:29 2007
@@ -0,0 +1,161 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.vmpipe;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.LoggingFilter;
+
+/**
+ * Makes sure if the order of event is correct.
+ * 
+ * @author The Apache MINA Project Team (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeEventOrderTest extends TestCase
+{
+    public void testServerToClient() throws Exception
+    {
+        IoAcceptor acceptor = new VmPipeAcceptor();
+        //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+        IoConnector connector = new VmPipeConnector();
+        //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+        
+        acceptor.setLocalAddress( new VmPipeAddress( 1 ) );
+        acceptor.setHandler(
+                new IoHandlerAdapter() {
+                    public void sessionOpened( IoSession session ) throws Exception
+                    {
+                        session.write("B");
+                    }
+                    
+                    public void messageSent( IoSession session, Object message ) throws Exception
+                    {
+                        session.close();
+                    }
+                });
+
+        acceptor.bind();
+        
+        final StringBuffer actual = new StringBuffer();
+        
+        connector.setHandler(
+                new IoHandlerAdapter() {
+
+                    public void messageReceived( IoSession session, Object message ) throws Exception
+                    {
+                        actual.append( message );
+                    }
+
+                    public void sessionClosed( IoSession session ) throws Exception
+                    {
+                        actual.append( "C" );
+                    }
+
+                    public void sessionOpened( IoSession session ) throws Exception {
+                        actual.append( "A" );
+                    }
+                    
+                });
+
+        ConnectFuture future = connector.connect( new VmPipeAddress( 1 ) );
+
+        future.join();
+        future.getSession().getCloseFuture().join();
+        acceptor.unbind();
+        
+        // sessionClosed() might not be invoked yet
+        // even if the connection is closed.
+        while( actual.indexOf("C") < 0 )
+        {
+            Thread.yield();
+        }
+        
+        Assert.assertEquals( "ABC", actual.toString() );
+    }
+
+    public void testClientToServer() throws Exception
+    {
+        IoAcceptor acceptor = new VmPipeAcceptor();
+        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+        IoConnector connector = new VmPipeConnector();
+        connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+        
+        final StringBuffer actual = new StringBuffer();
+        
+        acceptor.setLocalAddress( new VmPipeAddress( 1 ) );
+        acceptor.setHandler(
+                new IoHandlerAdapter() {
+
+                    public void messageReceived( IoSession session, Object message ) throws Exception
+                    {
+                        actual.append( message );
+                    }
+
+                    public void sessionClosed( IoSession session ) throws Exception
+                    {
+                        actual.append( "C" );
+                    }
+
+                    public void sessionOpened( IoSession session ) throws Exception {
+                        actual.append( "A" );
+                    }
+                    
+                });
+
+        acceptor.bind();
+        
+        connector.setHandler(
+                new IoHandlerAdapter() {
+                    public void sessionOpened( IoSession session ) throws Exception
+                    {
+                        session.write("B");
+                    }
+                    
+                    public void messageSent( IoSession session, Object message ) throws Exception
+                    {
+                        session.close();
+                    }
+                });
+
+        ConnectFuture future = connector.connect( new VmPipeAddress( 1 ) );
+
+        future.join();
+        future.getSession().getCloseFuture().join();
+        acceptor.unbind();
+        
+        // sessionClosed() might not be invoked yet
+        // even if the connection is closed.
+        while( actual.indexOf("C") < 0 )
+        {
+            Thread.yield();
+        }
+        
+        Assert.assertEquals( "ABC", actual.toString() );
+    }
+}

Propchange: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date