You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/01/30 08:05:30 UTC
svn commit: r501329 - in /mina:
branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/
branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/
branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/
branches/1.1/c...
Author: trustin
Date: Mon Jan 29 23:05:29 2007
New Revision: 501329
URL: http://svn.apache.org/viewvc?view=rev&rev=501329
Log:
Resolving DIRMINA-306 (Invalid sequence of events with VmPipe Transport)
* Added a test case for the issue
* Changed VmPipeFilterChain to use event queue to order events
Added:
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (with props)
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (with props)
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (with props)
Modified:
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Jan 29 23:05:29 2007
@@ -117,6 +117,7 @@
catch( Throwable t )
{
future.setException( t );
+ return future;
}
// initialize acceptor session
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Mon Jan 29 23:05:29 2007
@@ -20,46 +20,151 @@
package org.apache.mina.transport.vmpipe.support;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
+import edu.emory.mathcs.backport.java.util.Queue;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
+
/**
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
public class VmPipeFilterChain extends AbstractIoFilterChain {
+ private final Queue eventQueue = new ConcurrentLinkedQueue();
+
public VmPipeFilterChain( IoSession session )
{
super( session );
}
-
- public void fireMessageReceived( IoSession session, Object message )
+
+ private void pushEvent( Event e )
{
- VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
- synchronized( s.lock )
+ eventQueue.offer( e );
+ flushEvents();
+ }
+
+ private void flushEvents()
+ {
+ Event e;
+ while( ( e = ( Event ) eventQueue.poll() ) != null )
{
- if( !s.getTrafficMask().isReadable() )
+ fireEvent( e );
+ }
+ }
+
+ private void fireEvent( Event e )
+ {
+ IoSession session = getSession();
+ EventType type = e.getType();
+ Object data = e.getData();
+
+ if( type == EventType.RECEIVED )
+ {
+ VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
+ synchronized( s.lock )
{
- synchronized( s.pendingDataQueue )
+ if( !s.getTrafficMask().isReadable() )
{
- s.pendingDataQueue.push( message );
+ synchronized( s.pendingDataQueue )
+ {
+ s.pendingDataQueue.push( data );
+ }
}
- }
- else
- {
- int byteCount = 1;
- if( message instanceof ByteBuffer )
+ else
{
- byteCount = ( ( ByteBuffer ) message ).remaining();
+ int byteCount = 1;
+ if( data instanceof ByteBuffer )
+ {
+ byteCount = ( ( ByteBuffer ) data ).remaining();
+ }
+
+ s.increaseReadBytes( byteCount );
+
+ super.fireMessageReceived( s, data );
}
-
- s.increaseReadBytes( byteCount );
-
- super.fireMessageReceived( s, message );
}
}
+ else if( type == EventType.WRITE )
+ {
+ super.fireFilterWrite( session, ( WriteRequest ) data );
+ }
+ else if( type == EventType.SENT )
+ {
+ super.fireMessageSent( session, ( WriteRequest ) data );
+ }
+ else if( type == EventType.EXCEPTION )
+ {
+ super.fireExceptionCaught( session, ( Throwable ) data );
+ }
+ else if( type == EventType.IDLE )
+ {
+ super.fireSessionIdle( session, ( IdleStatus ) data );
+ }
+ else if( type == EventType.OPENED )
+ {
+ super.fireSessionOpened( session );
+ }
+ else if( type == EventType.CREATED )
+ {
+ super.fireSessionCreated( session );
+ }
+ else if( type == EventType.CLOSED )
+ {
+ super.fireSessionClosed( session );
+ }
+ else if( type == EventType.CLOSE )
+ {
+ super.fireFilterClose( session );
+ }
+ }
+
+ public void fireFilterClose( IoSession session )
+ {
+ pushEvent( new Event( EventType.CLOSE, null ) );
+ }
+
+ public void fireFilterWrite( IoSession session, WriteRequest writeRequest )
+ {
+ pushEvent( new Event( EventType.WRITE, writeRequest ) );
+ }
+
+ public void fireExceptionCaught( IoSession session, Throwable cause )
+ {
+ pushEvent( new Event( EventType.EXCEPTION, cause ) );
+ }
+
+ public void fireMessageSent( IoSession session, WriteRequest request )
+ {
+ pushEvent( new Event( EventType.SENT, request ) );
+ }
+
+ public void fireSessionClosed( IoSession session )
+ {
+ pushEvent( new Event( EventType.CLOSED, null ) );
+ }
+
+ public void fireSessionCreated( IoSession session )
+ {
+ pushEvent( new Event( EventType.CREATED, null ) );
+ }
+
+ public void fireSessionIdle( IoSession session, IdleStatus status )
+ {
+ pushEvent( new Event( EventType.IDLE, status ) );
+ }
+
+ public void fireSessionOpened( IoSession session )
+ {
+ pushEvent( new Event( EventType.OPENED, null ) );
+ }
+
+ public void fireMessageReceived( IoSession session, Object message )
+ {
+ pushEvent( new Event( EventType.RECEIVED, message ) );
}
protected void doWrite( IoSession session, WriteRequest writeRequest )
@@ -79,7 +184,6 @@
}
else
{
-
Object message = writeRequest.getMessage();
int byteCount = 1;
@@ -99,9 +203,9 @@
s.increaseWrittenBytes( byteCount );
s.increaseWrittenMessages();
- s.getFilterChain().fireMessageSent( s, writeRequest );
s.getRemoteSession().getFilterChain()
- .fireMessageReceived( s.getRemoteSession(), messageCopy );
+ .fireMessageReceived( s.getRemoteSession(), messageCopy );
+ s.getFilterChain().fireMessageSent( s, writeRequest );
}
}
else
@@ -124,4 +228,52 @@
}
}
+ // FIXME Copied and pasted from {@link ExecutorFilter}.
+ private static class EventType
+ {
+ public static final EventType CREATED = new EventType( "CREATED" );
+ public static final EventType OPENED = new EventType( "OPENED" );
+ public static final EventType CLOSED = new EventType( "CLOSED" );
+ public static final EventType RECEIVED = new EventType( "RECEIVED" );
+ public static final EventType SENT = new EventType( "SENT" );
+ public static final EventType IDLE = new EventType( "IDLE" );
+ public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+ public static final EventType WRITE = new EventType( "WRITE" );
+ public static final EventType CLOSE = new EventType( "CLOSE" );
+
+ private final String value;
+
+ private EventType( String value )
+ {
+ this.value = value;
+ }
+
+ public String toString()
+ {
+ return value;
+ }
+ }
+
+ private static class Event
+ {
+ private final EventType type;
+ private final Object data;
+
+ public Event( EventType type, Object data )
+ {
+ this.type = type;
+ this.data = data;
+ }
+
+ public Object getData()
+ {
+ return data;
+ }
+
+ public EventType getType()
+ {
+ return type;
+ }
+ }
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Mon Jan 29 23:05:29 2007
@@ -184,11 +184,17 @@
{
if( data[ i ] instanceof WriteRequest )
{
+ // TODO Optimize unefficient data transfer.
+ // Data will be returned to pendingDataQueue
+ // if getTraffic().isWritable() is false.
WriteRequest wr = ( WriteRequest ) data[ i ];
filterChain.doWrite( this, wr );
}
else
{
+ // TODO Optimize unefficient data transfer.
+ // Data will be returned to pendingDataQueue
+ // if getTraffic().isReadable() is false.
filterChain.fireMessageReceived( this, data[ i ] );
}
}
Added: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?view=auto&rev=501329
==============================================================================
--- mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (added)
+++ mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java Mon Jan 29 23:05:29 2007
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+/**
+ * Makes sure if the order of event is correct.
+ *
+ * @author The Apache MINA Project Team (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeEventOrderTest extends TestCase
+{
+ public void testServerToClient() throws Exception
+ {
+ IoAcceptor acceptor = new VmPipeAcceptor();
+ acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ IoConnector connector = new VmPipeConnector();
+ connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ acceptor.bind(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+ public void sessionOpened( IoSession session ) throws Exception
+ {
+ session.write("B");
+ }
+
+ public void messageSent( IoSession session, Object message ) throws Exception
+ {
+ session.close();
+ }
+ });
+
+ final StringBuffer actual = new StringBuffer();
+
+ ConnectFuture future = connector.connect(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+
+ public void messageReceived( IoSession session, Object message ) throws Exception
+ {
+ actual.append( message );
+ }
+
+ public void sessionClosed( IoSession session ) throws Exception
+ {
+ actual.append( "C" );
+ }
+
+ public void sessionOpened( IoSession session ) throws Exception {
+ actual.append( "A" );
+ }
+
+ });
+
+ future.join();
+ future.getSession().getCloseFuture().join();
+ acceptor.unbindAll();
+
+ // sessionClosed() might not be invoked yet
+ // even if the connection is closed.
+ while( actual.indexOf("C") < 0 )
+ {
+ Thread.yield();
+ }
+
+ Assert.assertEquals( "ABC", actual.toString() );
+ }
+
+ public void testClientToServer() throws Exception
+ {
+ IoAcceptor acceptor = new VmPipeAcceptor();
+ acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ IoConnector connector = new VmPipeConnector();
+ connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ final StringBuffer actual = new StringBuffer();
+
+ acceptor.bind(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+
+ public void messageReceived( IoSession session, Object message ) throws Exception
+ {
+ actual.append( message );
+ }
+
+ public void sessionClosed( IoSession session ) throws Exception
+ {
+ actual.append( "C" );
+ }
+
+ public void sessionOpened( IoSession session ) throws Exception {
+ actual.append( "A" );
+ }
+
+ });
+
+ ConnectFuture future = connector.connect(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+ public void sessionOpened( IoSession session ) throws Exception
+ {
+ session.write("B");
+ }
+
+ public void messageSent( IoSession session, Object message ) throws Exception
+ {
+ session.close();
+ }
+ });
+
+ future.join();
+ future.getSession().getCloseFuture().join();
+ acceptor.unbindAll();
+
+ // sessionClosed() might not be invoked yet
+ // even if the connection is closed.
+ while( actual.indexOf("C") < 0 )
+ {
+ Thread.yield();
+ }
+
+ Assert.assertEquals( "ABC", actual.toString() );
+ }
+}
Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Jan 29 23:05:29 2007
@@ -117,6 +117,7 @@
catch( Throwable t )
{
future.setException( t );
+ return future;
}
// initialize acceptor session
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Mon Jan 29 23:05:29 2007
@@ -6,20 +6,24 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.vmpipe.support;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
@@ -30,34 +34,143 @@
*/
public class VmPipeFilterChain extends AbstractIoFilterChain {
+ private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
+
public VmPipeFilterChain( IoSession session )
{
super( session );
}
-
- @Override
- public void fireMessageReceived( IoSession session, Object message )
+
+ private void pushEvent( Event e )
{
- VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
- synchronized( s.lock )
+ eventQueue.offer( e );
+ flushEvents();
+ }
+
+ private void flushEvents()
+ {
+ Event e;
+ while( ( e = eventQueue.poll() ) != null )
{
- if( !s.getTrafficMask().isReadable() )
- {
- s.pendingDataQueue.add( message );
- }
- else
+ fireEvent( e );
+ }
+ }
+
+ private void fireEvent( Event e )
+ {
+ IoSession session = getSession();
+ EventType type = e.getType();
+ Object data = e.getData();
+
+ if( type == EventType.RECEIVED )
+ {
+ VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
+ synchronized( s.lock )
{
- int byteCount = 1;
- if( message instanceof ByteBuffer )
+ if( !s.getTrafficMask().isReadable() )
{
- byteCount = ( ( ByteBuffer ) message ).remaining();
+ s.pendingDataQueue.offer( data );
+ }
+ else
+ {
+ int byteCount = 1;
+ if( data instanceof ByteBuffer )
+ {
+ byteCount = ( ( ByteBuffer ) data ).remaining();
+ }
+
+ s.increaseReadBytes( byteCount );
+
+ super.fireMessageReceived( s, data );
}
-
- s.increaseReadBytes( byteCount );
-
- super.fireMessageReceived( s, message );
}
}
+ else if( type == EventType.WRITE )
+ {
+ super.fireFilterWrite( session, ( WriteRequest ) data );
+ }
+ else if( type == EventType.SENT )
+ {
+ super.fireMessageSent( session, ( WriteRequest ) data );
+ }
+ else if( type == EventType.EXCEPTION )
+ {
+ super.fireExceptionCaught( session, ( Throwable ) data );
+ }
+ else if( type == EventType.IDLE )
+ {
+ super.fireSessionIdle( session, ( IdleStatus ) data );
+ }
+ else if( type == EventType.OPENED )
+ {
+ super.fireSessionOpened( session );
+ }
+ else if( type == EventType.CREATED )
+ {
+ super.fireSessionCreated( session );
+ }
+ else if( type == EventType.CLOSED )
+ {
+ super.fireSessionClosed( session );
+ }
+ else if( type == EventType.CLOSE )
+ {
+ super.fireFilterClose( session );
+ }
+ }
+
+ @Override
+ public void fireFilterClose( IoSession session )
+ {
+ pushEvent( new Event( EventType.CLOSE, null ) );
+ }
+
+ @Override
+ public void fireFilterWrite( IoSession session, WriteRequest writeRequest )
+ {
+ pushEvent( new Event( EventType.WRITE, writeRequest ) );
+ }
+
+ @Override
+ public void fireExceptionCaught( IoSession session, Throwable cause )
+ {
+ pushEvent( new Event( EventType.EXCEPTION, cause ) );
+ }
+
+ @Override
+ public void fireMessageSent( IoSession session, WriteRequest request )
+ {
+ pushEvent( new Event( EventType.SENT, request ) );
+ }
+
+ @Override
+ public void fireSessionClosed( IoSession session )
+ {
+ pushEvent( new Event( EventType.CLOSED, null ) );
+ }
+
+ @Override
+ public void fireSessionCreated( IoSession session )
+ {
+ pushEvent( new Event( EventType.CREATED, null ) );
+ }
+
+ @Override
+ public void fireSessionIdle( IoSession session, IdleStatus status )
+ {
+ pushEvent( new Event( EventType.IDLE, status ) );
+ }
+
+ @Override
+ public void fireSessionOpened( IoSession session )
+ {
+ pushEvent( new Event( EventType.OPENED, null ) );
+ }
+
+ @Override
+ public void fireMessageReceived( IoSession session, Object message )
+ {
+ pushEvent( new Event( EventType.RECEIVED, message ) );
}
@Override
@@ -68,15 +181,15 @@
{
if( s.isConnected() )
{
+
if( !s.getTrafficMask().isWritable() )
{
- s.pendingDataQueue.add( writeRequest );
+ s.pendingDataQueue.offer( writeRequest );
}
else
{
-
Object message = writeRequest.getMessage();
-
+
int byteCount = 1;
Object messageCopy = message;
if( message instanceof ByteBuffer )
@@ -90,16 +203,16 @@
rb.reset();
messageCopy = wb;
}
-
+
s.increaseWrittenBytes( byteCount );
s.increaseWrittenMessages();
-
- s.getFilterChain().fireMessageSent( s, writeRequest );
+
s.getRemoteSession().getFilterChain()
- .fireMessageReceived( s.getRemoteSession(), messageCopy );
+ .fireMessageReceived( s.getRemoteSession(), messageCopy );
+ s.getFilterChain().fireMessageSent( s, writeRequest );
}
}
- else
+ else
{
writeRequest.getFuture().setWritten( false );
}
@@ -119,5 +232,53 @@
}
}
}
+
+ // FIXME Copied and pasted from {@link ExecutorFilter}.
+ private static class EventType
+ {
+ public static final EventType CREATED = new EventType( "CREATED" );
+ public static final EventType OPENED = new EventType( "OPENED" );
+ public static final EventType CLOSED = new EventType( "CLOSED" );
+ public static final EventType RECEIVED = new EventType( "RECEIVED" );
+ public static final EventType SENT = new EventType( "SENT" );
+ public static final EventType IDLE = new EventType( "IDLE" );
+ public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+ public static final EventType WRITE = new EventType( "WRITE" );
+ public static final EventType CLOSE = new EventType( "CLOSE" );
+
+ private final String value;
+ private EventType( String value )
+ {
+ this.value = value;
+ }
+
+ public String toString()
+ {
+ return value;
+ }
+ }
+
+ private static class Event
+ {
+ private final EventType type;
+ private final Object data;
+
+ public Event( EventType type, Object data )
+ {
+ this.type = type;
+ this.data = data;
+ }
+
+ public Object getData()
+ {
+ return data;
+ }
+
+ public EventType getType()
+ {
+ return type;
+ }
+ }
}
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Mon Jan 29 23:05:29 2007
@@ -189,11 +189,17 @@
{
if( aData instanceof WriteRequest )
{
+ // TODO Optimize unefficient data transfer.
+ // Data will be returned to pendingDataQueue
+ // if getTraffic().isWritable() is false.
WriteRequest wr = ( WriteRequest ) aData;
filterChain.doWrite( this, wr );
}
else
{
+ // TODO Optimize unefficient data transfer.
+ // Data will be returned to pendingDataQueue
+ // if getTraffic().isReadable() is false.
filterChain.fireMessageReceived( this, aData );
}
}
Added: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?view=auto&rev=501329
==============================================================================
--- mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (added)
+++ mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java Mon Jan 29 23:05:29 2007
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+/**
+ * Makes sure if the order of event is correct.
+ *
+ * @author The Apache MINA Project Team (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeEventOrderTest extends TestCase
+{
+ public void testServerToClient() throws Exception
+ {
+ IoAcceptor acceptor = new VmPipeAcceptor();
+ acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ IoConnector connector = new VmPipeConnector();
+ connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ acceptor.bind(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+ public void sessionOpened( IoSession session ) throws Exception
+ {
+ session.write("B");
+ }
+
+ public void messageSent( IoSession session, Object message ) throws Exception
+ {
+ session.close();
+ }
+ });
+
+ final StringBuffer actual = new StringBuffer();
+
+ ConnectFuture future = connector.connect(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+
+ public void messageReceived( IoSession session, Object message ) throws Exception
+ {
+ actual.append( message );
+ }
+
+ public void sessionClosed( IoSession session ) throws Exception
+ {
+ actual.append( "C" );
+ }
+
+ public void sessionOpened( IoSession session ) throws Exception {
+ actual.append( "A" );
+ }
+
+ });
+
+ future.join();
+ future.getSession().getCloseFuture().join();
+ acceptor.unbindAll();
+
+ // sessionClosed() might not be invoked yet
+ // even if the connection is closed.
+ while( actual.indexOf("C") < 0 )
+ {
+ Thread.yield();
+ }
+
+ Assert.assertEquals( "ABC", actual.toString() );
+ }
+
+ public void testClientToServer() throws Exception
+ {
+ IoAcceptor acceptor = new VmPipeAcceptor();
+ acceptor.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ IoConnector connector = new VmPipeConnector();
+ connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+ //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ final StringBuffer actual = new StringBuffer();
+
+ acceptor.bind(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+
+ public void messageReceived( IoSession session, Object message ) throws Exception
+ {
+ actual.append( message );
+ }
+
+ public void sessionClosed( IoSession session ) throws Exception
+ {
+ actual.append( "C" );
+ }
+
+ public void sessionOpened( IoSession session ) throws Exception {
+ actual.append( "A" );
+ }
+
+ });
+
+ ConnectFuture future = connector.connect(
+ new VmPipeAddress( 1 ),
+ new IoHandlerAdapter() {
+ public void sessionOpened( IoSession session ) throws Exception
+ {
+ session.write("B");
+ }
+
+ public void messageSent( IoSession session, Object message ) throws Exception
+ {
+ session.close();
+ }
+ });
+
+ future.join();
+ future.getSession().getCloseFuture().join();
+ acceptor.unbindAll();
+
+ // sessionClosed() might not be invoked yet
+ // even if the connection is closed.
+ while( actual.indexOf("C") < 0 )
+ {
+ Thread.yield();
+ }
+
+ Assert.assertEquals( "ABC", actual.toString() );
+ }
+}
Propchange: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Jan 29 23:05:29 2007
@@ -90,6 +90,7 @@
catch( Throwable t )
{
future.setException( t );
+ return future;
}
// initialize acceptor session
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Mon Jan 29 23:05:29 2007
@@ -19,49 +19,161 @@
*/
package org.apache.mina.transport.vmpipe.support;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
/**
- * @author The Apache MINA Project (dev@mina.apache.org)
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
public class VmPipeFilterChain extends AbstractIoFilterChain {
+ private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
+
public VmPipeFilterChain( IoSession session )
{
super( session );
}
-
- public void fireMessageReceived( IoSession session, Object message )
+
+ private void pushEvent( Event e )
{
- VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
- synchronized( s.lock )
+ eventQueue.offer( e );
+ flushEvents();
+ }
+
+ private void flushEvents()
+ {
+ Event e;
+ while( ( e = eventQueue.poll() ) != null )
+ {
+ fireEvent( e );
+ }
+ }
+
+ private void fireEvent( Event e )
+ {
+ IoSession session = getSession();
+ EventType type = e.getType();
+ Object data = e.getData();
+
+ if( type == EventType.RECEIVED )
{
- if( !s.getTrafficMask().isReadable() )
+ VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
+ synchronized( s.lock )
{
- synchronized( s.pendingDataQueue )
+ if( !s.getTrafficMask().isReadable() )
{
- s.pendingDataQueue.offer( message );
+ s.pendingDataQueue.offer( data );
}
- }
- else
- {
- int byteCount = 1;
- if( message instanceof ByteBuffer )
+ else
{
- byteCount = ( ( ByteBuffer ) message ).remaining();
+ int byteCount = 1;
+ if( data instanceof ByteBuffer )
+ {
+ byteCount = ( ( ByteBuffer ) data ).remaining();
+ }
+
+ s.increaseReadBytes( byteCount );
+
+ super.fireMessageReceived( s, data );
}
-
- s.increaseReadBytes( byteCount );
-
- super.fireMessageReceived( s, message );
}
}
+ else if( type == EventType.WRITE )
+ {
+ super.fireFilterWrite( session, ( WriteRequest ) data );
+ }
+ else if( type == EventType.SENT )
+ {
+ super.fireMessageSent( session, ( WriteRequest ) data );
+ }
+ else if( type == EventType.EXCEPTION )
+ {
+ super.fireExceptionCaught( session, ( Throwable ) data );
+ }
+ else if( type == EventType.IDLE )
+ {
+ super.fireSessionIdle( session, ( IdleStatus ) data );
+ }
+ else if( type == EventType.OPENED )
+ {
+ super.fireSessionOpened( session );
+ }
+ else if( type == EventType.CREATED )
+ {
+ super.fireSessionCreated( session );
+ }
+ else if( type == EventType.CLOSED )
+ {
+ super.fireSessionClosed( session );
+ }
+ else if( type == EventType.CLOSE )
+ {
+ super.fireFilterClose( session );
+ }
+ }
+
+ @Override
+ public void fireFilterClose( IoSession session )
+ {
+ pushEvent( new Event( EventType.CLOSE, null ) );
+ }
+
+ @Override
+ public void fireFilterWrite( IoSession session, WriteRequest writeRequest )
+ {
+ pushEvent( new Event( EventType.WRITE, writeRequest ) );
}
+ @Override
+ public void fireExceptionCaught( IoSession session, Throwable cause )
+ {
+ pushEvent( new Event( EventType.EXCEPTION, cause ) );
+ }
+
+ @Override
+ public void fireMessageSent( IoSession session, WriteRequest request )
+ {
+ pushEvent( new Event( EventType.SENT, request ) );
+ }
+
+ @Override
+ public void fireSessionClosed( IoSession session )
+ {
+ pushEvent( new Event( EventType.CLOSED, null ) );
+ }
+
+ @Override
+ public void fireSessionCreated( IoSession session )
+ {
+ pushEvent( new Event( EventType.CREATED, null ) );
+ }
+
+ @Override
+ public void fireSessionIdle( IoSession session, IdleStatus status )
+ {
+ pushEvent( new Event( EventType.IDLE, status ) );
+ }
+
+ @Override
+ public void fireSessionOpened( IoSession session )
+ {
+ pushEvent( new Event( EventType.OPENED, null ) );
+ }
+
+ @Override
+ public void fireMessageReceived( IoSession session, Object message )
+ {
+ pushEvent( new Event( EventType.RECEIVED, message ) );
+ }
+
+ @Override
protected void doWrite( IoSession session, WriteRequest writeRequest )
{
VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -72,14 +184,10 @@
if( !s.getTrafficMask().isWritable() )
{
- synchronized( s.pendingDataQueue )
- {
- s.pendingDataQueue.offer( writeRequest );
- }
+ s.pendingDataQueue.offer( writeRequest );
}
else
{
-
Object message = writeRequest.getMessage();
int byteCount = 1;
@@ -99,9 +207,9 @@
s.increaseWrittenBytes( byteCount );
s.increaseWrittenMessages();
- s.getFilterChain().fireMessageSent( s, writeRequest );
s.getRemoteSession().getFilterChain()
- .fireMessageReceived( s.getRemoteSession(), messageCopy );
+ .fireMessageReceived( s.getRemoteSession(), messageCopy );
+ s.getFilterChain().fireMessageSent( s, writeRequest );
}
}
else
@@ -111,6 +219,7 @@
}
}
+ @Override
protected void doClose( IoSession session )
{
VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -124,4 +233,52 @@
}
}
+ // FIXME Copied and pasted from {@link ExecutorFilter}.
+ private static class EventType
+ {
+ public static final EventType CREATED = new EventType( "CREATED" );
+ public static final EventType OPENED = new EventType( "OPENED" );
+ public static final EventType CLOSED = new EventType( "CLOSED" );
+ public static final EventType RECEIVED = new EventType( "RECEIVED" );
+ public static final EventType SENT = new EventType( "SENT" );
+ public static final EventType IDLE = new EventType( "IDLE" );
+ public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+ public static final EventType WRITE = new EventType( "WRITE" );
+ public static final EventType CLOSE = new EventType( "CLOSE" );
+
+ private final String value;
+
+ private EventType( String value )
+ {
+ this.value = value;
+ }
+
+ public String toString()
+ {
+ return value;
+ }
+ }
+
+ private static class Event
+ {
+ private final EventType type;
+ private final Object data;
+
+ public Event( EventType type, Object data )
+ {
+ this.type = type;
+ this.data = data;
+ }
+
+ public Object getData()
+ {
+ return data;
+ }
+
+ public EventType getType()
+ {
+ return type;
+ }
+ }
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=501329&r1=501328&r2=501329
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Mon Jan 29 23:05:29 2007
@@ -6,22 +6,24 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.vmpipe.support;
import java.net.SocketAddress;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
@@ -36,14 +38,16 @@
/**
* A {@link IoSession} for in-VM transport (VM_PIPE).
- *
- * @author The Apache MINA Project (dev@mina.apache.org)
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
public class VmPipeSessionImpl extends BaseIoSession
{
- private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
-
+ private static final IoSessionConfig CONFIG = new BaseIoSessionConfig()
+ {
+ };
+
private final IoService service;
private final IoServiceListenerSupport serviceListeners;
private final SocketAddress localAddress;
@@ -53,7 +57,7 @@
private final VmPipeFilterChain filterChain;
private final VmPipeSessionImpl remoteSession;
final Object lock;
- final Queue<Object> pendingDataQueue;
+ final BlockingQueue<Object> pendingDataQueue;
/**
* Constructor for client-side session.
@@ -70,7 +74,7 @@
this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
this.handler = handler;
this.filterChain = new VmPipeFilterChain( this );
- this.pendingDataQueue = new LinkedList<Object>();
+ this.pendingDataQueue = new LinkedBlockingQueue<Object>();
remoteSession = new VmPipeSessionImpl( this, remoteEntry );
}
@@ -88,19 +92,19 @@
this.handler = entry.getHandler();
this.filterChain = new VmPipeFilterChain( this );
this.remoteSession = remoteSession;
- this.pendingDataQueue = new LinkedList<Object>();
+ this.pendingDataQueue = new LinkedBlockingQueue<Object>();
}
-
+
public IoService getService()
{
return service;
}
-
+
IoServiceListenerSupport getServiceListeners()
{
return serviceListeners;
}
-
+
public IoSessionConfig getConfig()
{
return CONFIG;
@@ -110,7 +114,7 @@
{
return filterChain;
}
-
+
public VmPipeSessionImpl getRemoteSession()
{
return remoteSession;
@@ -121,11 +125,13 @@
return handler;
}
+ @Override
protected void close0()
{
filterChain.fireFilterClose( this );
}
-
+
+ @Override
protected void write0( WriteRequest writeRequest )
{
this.filterChain.fireFilterWrite( this, writeRequest );
@@ -140,7 +146,7 @@
{
return 0;
}
-
+
public TransportType getTransportType()
{
return TransportType.VM_PIPE;
@@ -155,33 +161,37 @@
{
return localAddress;
}
-
+
public SocketAddress getServiceAddress()
{
return serviceAddress;
}
+ @Override
protected void updateTrafficMask()
{
- if( getTrafficMask().isReadable() || getTrafficMask().isWritable())
+ if( getTrafficMask().isReadable() || getTrafficMask().isWritable() )
{
- Object[] data;
- synchronized( pendingDataQueue )
- {
- data = pendingDataQueue.toArray();
- pendingDataQueue.clear();
- }
-
- for( int i = 0; i < data.length; i++ )
+ List<Object> data = new ArrayList<Object>();
+
+ pendingDataQueue.drainTo( data );
+
+ for( Object aData : data )
{
- if( data[ i ] instanceof WriteRequest )
+ if( aData instanceof WriteRequest )
{
- WriteRequest wr = ( WriteRequest ) data[ i ];
+ // TODO Optimize unefficient data transfer.
+ // Data will be returned to pendingDataQueue
+ // if getTraffic().isWritable() is false.
+ WriteRequest wr = ( WriteRequest ) aData;
filterChain.doWrite( this, wr );
}
else
{
- filterChain.fireMessageReceived( this, data[ i ] );
+ // TODO Optimize unefficient data transfer.
+ // Data will be returned to pendingDataQueue
+ // if getTraffic().isReadable() is false.
+ filterChain.fireMessageReceived( this, aData );
}
}
}
Added: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?view=auto&rev=501329
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java (added)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java Mon Jan 29 23:05:29 2007
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.LoggingFilter;
+
+/**
+ * Makes sure if the order of event is correct.
+ *
+ * @author The Apache MINA Project Team (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeEventOrderTest extends TestCase
+{
+ public void testServerToClient() throws Exception
+ {
+ IoAcceptor acceptor = new VmPipeAcceptor();
+ //acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ IoConnector connector = new VmPipeConnector();
+ //connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ acceptor.setLocalAddress( new VmPipeAddress( 1 ) );
+ acceptor.setHandler(
+ new IoHandlerAdapter() {
+ public void sessionOpened( IoSession session ) throws Exception
+ {
+ session.write("B");
+ }
+
+ public void messageSent( IoSession session, Object message ) throws Exception
+ {
+ session.close();
+ }
+ });
+
+ acceptor.bind();
+
+ final StringBuffer actual = new StringBuffer();
+
+ connector.setHandler(
+ new IoHandlerAdapter() {
+
+ public void messageReceived( IoSession session, Object message ) throws Exception
+ {
+ actual.append( message );
+ }
+
+ public void sessionClosed( IoSession session ) throws Exception
+ {
+ actual.append( "C" );
+ }
+
+ public void sessionOpened( IoSession session ) throws Exception {
+ actual.append( "A" );
+ }
+
+ });
+
+ ConnectFuture future = connector.connect( new VmPipeAddress( 1 ) );
+
+ future.join();
+ future.getSession().getCloseFuture().join();
+ acceptor.unbind();
+
+ // sessionClosed() might not be invoked yet
+ // even if the connection is closed.
+ while( actual.indexOf("C") < 0 )
+ {
+ Thread.yield();
+ }
+
+ Assert.assertEquals( "ABC", actual.toString() );
+ }
+
+ public void testClientToServer() throws Exception
+ {
+ IoAcceptor acceptor = new VmPipeAcceptor();
+ acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ IoConnector connector = new VmPipeConnector();
+ connector.getFilterChain().addLast( "logger", new LoggingFilter() );
+
+ final StringBuffer actual = new StringBuffer();
+
+ acceptor.setLocalAddress( new VmPipeAddress( 1 ) );
+ acceptor.setHandler(
+ new IoHandlerAdapter() {
+
+ public void messageReceived( IoSession session, Object message ) throws Exception
+ {
+ actual.append( message );
+ }
+
+ public void sessionClosed( IoSession session ) throws Exception
+ {
+ actual.append( "C" );
+ }
+
+ public void sessionOpened( IoSession session ) throws Exception {
+ actual.append( "A" );
+ }
+
+ });
+
+ acceptor.bind();
+
+ connector.setHandler(
+ new IoHandlerAdapter() {
+ public void sessionOpened( IoSession session ) throws Exception
+ {
+ session.write("B");
+ }
+
+ public void messageSent( IoSession session, Object message ) throws Exception
+ {
+ session.close();
+ }
+ });
+
+ ConnectFuture future = connector.connect( new VmPipeAddress( 1 ) );
+
+ future.join();
+ future.getSession().getCloseFuture().join();
+ acceptor.unbind();
+
+ // sessionClosed() might not be invoked yet
+ // even if the connection is closed.
+ while( actual.indexOf("C") < 0 )
+ {
+ Thread.yield();
+ }
+
+ Assert.assertEquals( "ABC", actual.toString() );
+ }
+}
Propchange: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date