You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by pr...@apache.org on 2006/11/05 20:36:43 UTC
svn commit: r471502 [2/2] - in /directory/branches/mina/1.2:
core/src/main/java/org/apache/mina/common/
core/src/main/java/org/apache/mina/common/support/
core/src/main/java/org/apache/mina/filter/
core/src/main/java/org/apache/mina/filter/codec/ core/...
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Sun Nov 5 11:36:41 2006
@@ -26,17 +26,21 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionRecycler;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
import org.apache.mina.common.support.BaseIoConnector;
import org.apache.mina.common.support.DefaultConnectFuture;
@@ -44,9 +48,6 @@
import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.Queue;
-
-import java.util.concurrent.Executor;
/**
* {@link IoConnector} for datagram transport (UDP/IP).
@@ -56,17 +57,18 @@
*/
public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramService
{
- private static volatile int nextId = 0;
+ private static final AtomicInteger nextId = new AtomicInteger( );
+ private final Object lock = new Object();
private final IoConnector wrapper;
private final Executor executor;
- private final int id = nextId ++ ;
+ private final int id = nextId.getAndIncrement();
private Selector selector;
private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
- private final Queue flushingSessions = new Queue();
- private final Queue trafficControllingSessions = new Queue();
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+ private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+ private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+ private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
private Worker worker;
/**
@@ -93,12 +95,12 @@
if( !( address instanceof InetSocketAddress ) )
throw new IllegalArgumentException( "Unexpected address type: "
- + address.getClass() );
+ + address.getClass() );
if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
{
throw new IllegalArgumentException( "Unexpected local address type: "
- + localAddress.getClass() );
+ + localAddress.getClass() );
}
if( config == null )
@@ -160,33 +162,27 @@
}
RegistrationRequest request = new RegistrationRequest( ch, handler, config );
- synchronized( this )
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
{
try
{
- startupWorker();
+ ch.disconnect();
+ ch.close();
}
- catch( IOException e )
+ catch( IOException e2 )
{
- try
- {
- ch.disconnect();
- ch.close();
- }
- catch( IOException e2 )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e2 );
- }
-
- return DefaultConnectFuture.newFailedFuture( e );
+ ExceptionMonitor.getInstance().exceptionCaught( e2 );
}
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
+ return DefaultConnectFuture.newFailedFuture( e );
}
+ registerQueue.add( request );
+
selector.wakeup();
return request;
}
@@ -211,40 +207,37 @@
this.defaultConfig = defaultConfig;
}
- private synchronized void startupWorker() throws IOException
+ private void startupWorker() throws IOException
{
- if( worker == null )
+ synchronized( lock )
{
- selector = Selector.open();
- worker = new Worker();
- executor.execute( new NamePreservingRunnable( worker ) );
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute( new NamePreservingRunnable( worker ) );
+ }
}
}
public void closeSession( DatagramSessionImpl session )
{
- synchronized( this )
+ try
{
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply return
- // silently here because it we can simply conclude that
- // this session is not managed by this connector or
- // already closed.
- return;
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( session );
- }
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply return
+ // silently here because it we can simply conclude that
+ // this session is not managed by this connector or
+ // already closed.
+ return;
}
+ cancelQueue.add( session );
+
selector.wakeup();
}
@@ -260,10 +253,7 @@
private void scheduleFlush( DatagramSessionImpl session )
{
- synchronized( flushingSessions )
- {
- flushingSessions.push( session );
- }
+ flushingSessions.add( session );
}
public void updateTrafficMask( DatagramSessionImpl session )
@@ -274,15 +264,11 @@
{
selector.wakeup();
}
- selector.wakeup();
}
private void scheduleTrafficControl( DatagramSessionImpl session )
{
- synchronized( trafficControllingSessions )
- {
- trafficControllingSessions.push( session );
- }
+ trafficControllingSessions.add( session );
}
private void doUpdateTrafficMask()
@@ -290,14 +276,9 @@
if( trafficControllingSessions.isEmpty() )
return;
- for( ;; )
+ for( ; ; )
{
- DatagramSessionImpl session;
-
- synchronized( trafficControllingSessions )
- {
- session = ( DatagramSessionImpl ) trafficControllingSessions.pop();
- }
+ DatagramSessionImpl session = trafficControllingSessions.poll();
if( session == null )
break;
@@ -320,13 +301,9 @@
// The normal is OP_READ and, if there are write requests in the
// session's write queue, set OP_WRITE to trigger flushing.
int ops = SelectionKey.OP_READ;
- Queue writeRequestQueue = session.getWriteRequestQueue();
- synchronized( writeRequestQueue )
+ if( !session.getWriteRequestQueue().isEmpty() )
{
- if( !writeRequestQueue.isEmpty() )
- {
- ops |= SelectionKey.OP_WRITE;
- }
+ ops |= SelectionKey.OP_WRITE;
}
// Now mask the preferred ops with the mask of the current session
@@ -341,7 +318,7 @@
{
Thread.currentThread().setName( "DatagramConnector-" + id );
- for( ;; )
+ for( ; ; )
{
try
{
@@ -360,7 +337,7 @@
if( selector.keys().isEmpty() )
{
- synchronized( DatagramConnectorDelegate.this )
+ synchronized( lock )
{
if( selector.keys().isEmpty() &&
registerQueue.isEmpty() &&
@@ -386,7 +363,7 @@
}
catch( IOException e )
{
- ExceptionMonitor.getInstance().exceptionCaught( e );
+ ExceptionMonitor.getInstance().exceptionCaught( e );
try
{
@@ -394,25 +371,26 @@
}
catch( InterruptedException e1 )
{
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
}
}
}
}
}
- private void processReadySessions( Set keys )
+ private void processReadySessions( Set<SelectionKey> keys )
{
- Iterator it = keys.iterator();
+ Iterator<SelectionKey> it = keys.iterator();
while( it.hasNext() )
{
- SelectionKey key = ( SelectionKey ) it.next();
+ SelectionKey key = it.next();
it.remove();
DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
- DatagramSessionImpl replaceSession = getRecycledSession(session);
+ DatagramSessionImpl replaceSession = getRecycledSession( session );
- if(replaceSession != null)
+ if( replaceSession != null )
{
session = replaceSession;
}
@@ -432,16 +410,16 @@
private DatagramSessionImpl getRecycledSession( IoSession session )
{
IoSessionRecycler sessionRecycler = getSessionRecycler( session );
- DatagramSessionImpl replaceSession = null;
- if ( sessionRecycler != null )
+ if( sessionRecycler != null )
{
- synchronized ( sessionRecycler )
+ synchronized( sessionRecycler )
{
- replaceSession = ( DatagramSessionImpl ) sessionRecycler.recycle( session.getLocalAddress(), session
- .getRemoteAddress() );
+ DatagramSessionImpl replaceSession =
+ ( DatagramSessionImpl ) sessionRecycler.recycle( session.getLocalAddress(),
+ session.getRemoteAddress() );
- if ( replaceSession != null )
+ if( replaceSession != null )
{
return replaceSession;
}
@@ -501,14 +479,9 @@
if( flushingSessions.size() == 0 )
return;
- for( ;; )
+ for( ; ; )
{
- DatagramSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = ( DatagramSessionImpl ) flushingSessions.pop();
- }
+ DatagramSessionImpl session = flushingSessions.poll();
if( session == null )
break;
@@ -528,15 +501,11 @@
{
DatagramChannel ch = session.getChannel();
- Queue writeRequestQueue = session.getWriteRequestQueue();
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
- for( ;; )
+ for( ; ; )
{
- synchronized( writeRequestQueue )
- {
- req = ( WriteRequest ) writeRequestQueue.first();
- }
+ WriteRequest req = writeRequestQueue.peek();
if( req == null )
break;
@@ -545,10 +514,7 @@
if( buf.remaining() == 0 )
{
// pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
+ writeRequestQueue.poll();
session.increaseWrittenMessages();
buf.reset();
@@ -577,13 +543,10 @@
else if( writtenBytes > 0 )
{
key.interestOps( key.interestOps()
- & ( ~SelectionKey.OP_WRITE ) );
+ & ( ~SelectionKey.OP_WRITE ) );
// pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
+ writeRequestQueue.poll();
session.increaseWrittenBytes( writtenBytes );
session.increaseWrittenMessages();
@@ -598,22 +561,18 @@
if( registerQueue.isEmpty() )
return;
- for( ;; )
+ for( ; ; )
{
- RegistrationRequest req;
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
+ RegistrationRequest req = registerQueue.poll();
if( req == null )
break;
DatagramSessionImpl session = new DatagramSessionImpl(
- wrapper, this,
- req.config,
- req.channel, req.handler,
- req.channel.socket().getRemoteSocketAddress() );
+ wrapper, this,
+ req.config,
+ req.channel, req.handler,
+ req.channel.socket().getRemoteSocketAddress() );
// AbstractIoFilterChain will notify the connect future.
session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, req );
@@ -623,14 +582,14 @@
{
DatagramSessionImpl replaceSession = getRecycledSession( session );
- if ( replaceSession != null )
+ if( replaceSession != null )
{
session = replaceSession;
}
else
{
SelectionKey key = req.channel.register( selector,
- SelectionKey.OP_READ, session );
+ SelectionKey.OP_READ, session );
session.setSelectionKey( key );
buildFilterChain( req, session );
@@ -653,7 +612,7 @@
req.channel.disconnect();
req.channel.close();
}
- catch (IOException e)
+ catch( IOException e )
{
ExceptionMonitor.getInstance().exceptionCaught( e );
}
@@ -674,13 +633,9 @@
if( cancelQueue.isEmpty() )
return;
- for( ;; )
+ for( ; ; )
{
- DatagramSessionImpl session;
- synchronized( cancelQueue )
- {
- session = ( DatagramSessionImpl ) cancelQueue.pop();
- }
+ DatagramSessionImpl session = cancelQueue.poll();
if( session == null )
break;
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Sun Nov 5 11:36:41 2006
@@ -6,49 +6,52 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio.support;
+import java.util.Queue;
+
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
/**
* An {@link IoFilterChain} for datagram transport (UDP/IP).
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
*/
-class DatagramFilterChain extends AbstractIoFilterChain {
+class DatagramFilterChain extends AbstractIoFilterChain
+{
DatagramFilterChain( IoSession parent )
{
super( parent );
}
-
+
+ @Override
protected void doWrite( IoSession session, WriteRequest writeRequest )
{
DatagramSessionImpl s = ( DatagramSessionImpl ) session;
- Queue writeRequestQueue = s.getWriteRequestQueue();
-
+ Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
+
// SocketIoProcessor.doFlush() will reset it after write is finished
- // because the buffer will be passed with messageSent event.
+ // because the buffer will be passed with messageSent event.
( ( ByteBuffer ) writeRequest.getMessage() ).mark();
synchronized( writeRequestQueue )
{
- writeRequestQueue.push( writeRequest );
+ writeRequestQueue.add( writeRequest );
if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
{
// Notify DatagramService only when writeRequestQueue was empty.
@@ -57,13 +60,14 @@
}
}
+ @Override
protected void doClose( IoSession session )
{
DatagramSessionImpl s = ( DatagramSessionImpl ) session;
DatagramService manager = s.getManagerDelegate();
if( manager instanceof DatagramConnectorDelegate )
{
- ( ( DatagramConnectorDelegate ) manager ).closeSession( s );
+ manager.closeSession( s );
}
else
{
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Sun Nov 5 11:36:41 2006
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio.support;
@@ -23,8 +23,12 @@
import java.net.SocketException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.mina.common.BroadcastIoSession;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
@@ -34,15 +38,13 @@
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportType;
import org.apache.mina.common.WriteFuture;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
-import org.apache.mina.util.Queue;
/**
* An {@link IoSession} for datagram transport (UDP/IP).
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
@@ -54,7 +56,7 @@
private final DatagramService managerDelegate;
private final DatagramFilterChain filterChain;
private final DatagramChannel ch;
- private final Queue writeRequestQueue;
+ private final Queue<WriteRequest> writeRequestQueue;
private final IoHandler handler;
private final SocketAddress localAddress;
private final SocketAddress serviceAddress;
@@ -75,7 +77,7 @@
this.managerDelegate = managerDelegate;
this.filterChain = new DatagramFilterChain( this );
this.ch = ch;
- this.writeRequestQueue = new Queue();
+ this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>( );
this.handler = defaultHandler;
this.remoteAddress = ch.socket().getRemoteSocketAddress();
@@ -147,7 +149,8 @@
{
return handler;
}
-
+
+ @Override
protected void close0()
{
IoServiceConfig config = getServiceConfig();
@@ -158,21 +161,23 @@
filterChain.fireFilterClose( this );
}
- Queue getWriteRequestQueue()
+ Queue<WriteRequest> getWriteRequestQueue()
{
return writeRequestQueue;
}
-
+
+ @Override
public WriteFuture write( Object message, SocketAddress destination )
{
if( !this.config.isBroadcast() )
{
throw new IllegalStateException( "Non-broadcast session" );
}
-
+
return super.write( message, destination );
}
+ @Override
protected void write0( WriteRequest writeRequest )
{
filterChain.fireFilterWrite( this, writeRequest );
@@ -180,18 +185,19 @@
public int getScheduledWriteRequests()
{
- synchronized( writeRequestQueue )
- {
- return writeRequestQueue.size();
- }
+ return writeRequestQueue.size();
}
public int getScheduledWriteBytes()
{
- synchronized( writeRequestQueue )
+ int byteSize = 0;
+
+ for( WriteRequest request : writeRequestQueue )
{
- return writeRequestQueue.byteSize();
+ byteSize += ( ( ByteBuffer ) request.getMessage() ).remaining();
}
+
+ return byteSize;
}
public TransportType getTransportType()
@@ -219,6 +225,7 @@
return serviceAddress;
}
+ @Override
protected void updateTrafficMask()
{
managerDelegate.updateTrafficMask( this );
@@ -229,8 +236,10 @@
return readBufferSize;
}
+ @SuppressWarnings({"CloneableClassWithoutClone"})
private class SessionConfigImpl extends DatagramSessionConfigImpl implements DatagramSessionConfig
{
+ @Override
public int getReceiveBufferSize()
{
try
@@ -243,6 +252,7 @@
}
}
+ @Override
public void setReceiveBufferSize( int receiveBufferSize )
{
if( DatagramSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
@@ -259,6 +269,7 @@
}
}
+ @Override
public boolean isBroadcast()
{
try
@@ -271,6 +282,7 @@
}
}
+ @Override
public void setBroadcast( boolean broadcast )
{
try
@@ -283,6 +295,7 @@
}
}
+ @Override
public int getSendBufferSize()
{
try
@@ -295,6 +308,7 @@
}
}
+ @Override
public void setSendBufferSize( int sendBufferSize )
{
if( DatagramSessionConfigImpl.isSetSendBufferSizeAvailable() )
@@ -310,6 +324,7 @@
}
}
+ @Override
public boolean isReuseAddress()
{
try
@@ -322,6 +337,7 @@
}
}
+ @Override
public void setReuseAddress( boolean reuseAddress )
{
try
@@ -334,6 +350,7 @@
}
}
+ @Override
public int getTrafficClass()
{
if( DatagramSessionConfigImpl.isGetTrafficClassAvailable() )
@@ -353,6 +370,7 @@
}
}
+ @Override
public void setTrafficClass( int trafficClass )
{
if( DatagramSessionConfigImpl.isSetTrafficClassAvailable() )
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Sun Nov 5 11:36:41 2006
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.vmpipe.support;
@@ -35,6 +35,7 @@
super( session );
}
+ @Override
public void fireMessageReceived( IoSession session, Object message )
{
VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -42,10 +43,7 @@
{
if( !s.getTrafficMask().isReadable() )
{
- synchronized( s.pendingDataQueue )
- {
- s.pendingDataQueue.push( message );
- }
+ s.pendingDataQueue.add( message );
}
else
{
@@ -54,14 +52,15 @@
{
byteCount = ( ( ByteBuffer ) message ).remaining();
}
-
+
s.increaseReadBytes( byteCount );
-
+
super.fireMessageReceived( s, message );
}
}
}
+ @Override
protected void doWrite( IoSession session, WriteRequest writeRequest )
{
VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -69,19 +68,15 @@
{
if( s.isConnected() )
{
-
if( !s.getTrafficMask().isWritable() )
{
- synchronized( s.pendingDataQueue )
- {
- s.pendingDataQueue.push( writeRequest );
- }
+ s.pendingDataQueue.add( writeRequest );
}
else
{
-
+
Object message = writeRequest.getMessage();
-
+
int byteCount = 1;
Object messageCopy = message;
if( message instanceof ByteBuffer )
@@ -95,22 +90,23 @@
rb.reset();
messageCopy = wb;
}
-
+
s.increaseWrittenBytes( byteCount );
s.increaseWrittenMessages();
-
+
s.getFilterChain().fireMessageSent( s, writeRequest );
s.getRemoteSession().getFilterChain()
.fireMessageReceived( s.getRemoteSession(), messageCopy );
}
}
- else
+ else
{
writeRequest.getFuture().setWritten( false );
}
}
}
+ @Override
protected void doClose( IoSession session )
{
VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -123,5 +119,5 @@
}
}
}
-
+
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Sun Nov 5 11:36:41 2006
@@ -6,21 +6,26 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.vmpipe.support;
import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
@@ -28,22 +33,22 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.common.support.BaseIoSessionConfig;
import org.apache.mina.common.support.IoServiceListenerSupport;
-import org.apache.mina.util.Queue;
/**
* A {@link IoSession} for in-VM transport (VM_PIPE).
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
public class VmPipeSessionImpl extends BaseIoSession
{
- private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
-
+ private static final IoSessionConfig CONFIG = new BaseIoSessionConfig()
+ {
+ };
+
private final IoService service;
private final IoServiceConfig serviceConfig;
private final IoServiceListenerSupport serviceListeners;
@@ -54,15 +59,15 @@
private final VmPipeFilterChain filterChain;
private final VmPipeSessionImpl remoteSession;
final Object lock;
- final Queue pendingDataQueue;
+ final BlockingQueue<Object> pendingDataQueue;
/**
* Constructor for client-side session.
*/
public VmPipeSessionImpl(
- IoService service, IoServiceConfig serviceConfig,
- IoServiceListenerSupport serviceListeners, Object lock, SocketAddress localAddress,
- IoHandler handler, VmPipe remoteEntry )
+ IoService service, IoServiceConfig serviceConfig,
+ IoServiceListenerSupport serviceListeners, Object lock, SocketAddress localAddress,
+ IoHandler handler, VmPipe remoteEntry )
{
this.service = service;
this.serviceConfig = serviceConfig;
@@ -72,7 +77,7 @@
this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
this.handler = handler;
this.filterChain = new VmPipeFilterChain( this );
- this.pendingDataQueue = new Queue();
+ this.pendingDataQueue = new LinkedBlockingQueue<Object>();
remoteSession = new VmPipeSessionImpl( this, remoteEntry );
}
@@ -91,24 +96,24 @@
this.handler = entry.getHandler();
this.filterChain = new VmPipeFilterChain( this );
this.remoteSession = remoteSession;
- this.pendingDataQueue = new Queue();
+ this.pendingDataQueue = new LinkedBlockingQueue<Object>();
}
-
+
public IoService getService()
{
return service;
}
-
+
IoServiceListenerSupport getServiceListeners()
{
return serviceListeners;
}
-
+
public IoServiceConfig getServiceConfig()
{
return serviceConfig;
}
-
+
public IoSessionConfig getConfig()
{
return CONFIG;
@@ -118,7 +123,7 @@
{
return filterChain;
}
-
+
public VmPipeSessionImpl getRemoteSession()
{
return remoteSession;
@@ -129,11 +134,13 @@
return handler;
}
+ @Override
protected void close0()
{
filterChain.fireFilterClose( this );
}
-
+
+ @Override
protected void write0( WriteRequest writeRequest )
{
this.filterChain.fireFilterWrite( this, writeRequest );
@@ -148,7 +155,7 @@
{
return 0;
}
-
+
public TransportType getTransportType()
{
return TransportType.VM_PIPE;
@@ -163,33 +170,31 @@
{
return localAddress;
}
-
+
public SocketAddress getServiceAddress()
{
return serviceAddress;
}
+ @Override
protected void updateTrafficMask()
{
- if( getTrafficMask().isReadable() || getTrafficMask().isWritable())
+ if( getTrafficMask().isReadable() || getTrafficMask().isWritable() )
{
- Object[] data;
- synchronized( pendingDataQueue )
- {
- data = pendingDataQueue.toArray();
- pendingDataQueue.clear();
- }
-
- for( int i = 0; i < data.length; i++ )
+ List<Object> data = new ArrayList<Object>();
+
+ pendingDataQueue.drainTo( data );
+
+ for( Object aData : data )
{
- if( data[ i ] instanceof WriteRequest )
+ if( aData instanceof WriteRequest )
{
- WriteRequest wr = ( WriteRequest ) data[ i ];
+ WriteRequest wr = ( WriteRequest ) aData;
filterChain.doWrite( this, wr );
}
else
{
- filterChain.fireMessageReceived( this, data[ i ] );
+ filterChain.fireMessageReceived( this, aData );
}
}
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java Sun Nov 5 11:36:41 2006
@@ -20,10 +20,8 @@
package org.apache.mina.util;
import java.util.Collection;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
@@ -125,7 +123,8 @@
delegate.clear();
}
- public int hashCode()
+ @Override
+ public int hashCode()
{
return delegate.hashCode();
}
@@ -135,7 +134,8 @@
return delegate.keySet();
}
- public boolean equals( Object obj )
+ @Override
+ public boolean equals( Object obj )
{
return delegate.equals( obj );
}
@@ -144,18 +144,13 @@
{
synchronized( inMap )
{
- Iterator inMapKeysIt = inMap.keySet().iterator();
-
- while( inMapKeysIt.hasNext() )
- {
- Object key = inMapKeysIt.next();
- Object value = inMap.get( key );
+ for ( Object key : inMap.keySet() ) {
+ Object value = inMap.get( key );
- if( value instanceof ExpiringObject )
- {
- delegate.put( key, value );
- }
- }
+ if ( value instanceof ExpiringObject ) {
+ delegate.put( key, value );
+ }
+ }
}
}
@@ -214,7 +209,7 @@
private ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
- public ExpiringObject( Object key, Object value, long lastAccessTime )
+ ExpiringObject( Object key, Object value, long lastAccessTime )
{
if( value == null )
{
@@ -264,12 +259,14 @@
return value;
}
- public boolean equals( Object obj )
+ @Override
+ public boolean equals( Object obj )
{
return value.equals( obj );
}
- public int hashCode()
+ @Override
+ public int hashCode()
{
return value.hashCode();
}
@@ -305,7 +302,9 @@
}
catch( InterruptedException e )
{
- }
+ //Abort on interruption
+ return;
+ }
}
}
@@ -313,31 +312,24 @@
{
long timeNow = System.currentTimeMillis();
- Iterator expiringObjectsIterator = delegate.values().iterator();
+ for ( Object o : delegate.values() ) {
+ ExpiringObject expObject = (ExpiringObject) o;
- while( expiringObjectsIterator.hasNext() )
- {
- ExpiringObject expObject = ( ExpiringObject ) expiringObjectsIterator.next();
-
- if( timeToLiveMillis <= 0 )
- continue;
-
- long timeIdle = timeNow - expObject.getLastAccessTime();
+ if ( timeToLiveMillis <= 0 )
+ continue;
- if( timeIdle >= timeToLiveMillis )
- {
- delegate.remove( expObject.getKey() );
+ long timeIdle = timeNow - expObject.getLastAccessTime();
- Iterator listenerIterator = expirationListeners.iterator();
+ if ( timeIdle >= timeToLiveMillis ) {
+ delegate.remove( expObject.getKey() );
- while( listenerIterator.hasNext() )
- {
- ExpirationListener listener = ( ExpirationListener ) listenerIterator.next();
+ for ( Object expirationListener : expirationListeners ) {
+ ExpirationListener listener = (ExpirationListener) expirationListener;
- listener.expired( expObject.getValue() );
- }
- }
- }
+ listener.expired( expObject.getValue() );
+ }
+ }
+ }
}
public void startExpiring()
Modified: directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java (original)
+++ directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java Sun Nov 5 11:36:41 2006
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter;
@@ -25,32 +25,32 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.MessageDigest;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.Random;
import junit.framework.TestCase;
-
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilter.NextFilter;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
-import org.apache.mina.common.IoFilter.NextFilter;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.DefaultWriteFuture;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.util.AvailablePortFinder;
-import org.apache.mina.util.Queue;
import org.easymock.AbstractMatcher;
import org.easymock.MockControl;
/**
* Tests {@link StreamWriteFilter}.
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
@@ -60,6 +60,7 @@
IoSession session;
NextFilter nextFilter;
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -71,7 +72,7 @@
mockNextFilter = MockControl.createControl( NextFilter.class );
session = ( IoSession ) mockSession.getMock();
nextFilter = ( NextFilter ) mockNextFilter.getMock();
-
+
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( null );
}
@@ -79,29 +80,29 @@
public void testWriteEmptyStream() throws Exception
{
StreamWriteFilter filter = new StreamWriteFilter();
-
+
InputStream stream = new ByteArrayInputStream( new byte[ 0 ] );
WriteRequest writeRequest = new WriteRequest( stream, new DummyWriteFuture() );
-
+
/*
* Record expectations
*/
nextFilter.messageSent( session, stream );
-
+
/*
* Replay.
*/
mockNextFilter.replay();
mockSession.replay();
-
+
filter.filterWrite( nextFilter, session, writeRequest );
-
+
/*
* Verify.
*/
mockNextFilter.verify();
mockSession.verify();
-
+
assertTrue( writeRequest.getFuture().isWritten() );
}
@@ -112,10 +113,10 @@
public void testWriteNonStreamMessage() throws Exception
{
StreamWriteFilter filter = new StreamWriteFilter();
-
+
Object message = new Object();
WriteRequest writeRequest = new WriteRequest( message, new DummyWriteFuture() );
-
+
/*
* Record expectations
*/
@@ -123,35 +124,35 @@
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( null );
nextFilter.messageSent( session, message );
-
+
/*
* Replay.
*/
mockNextFilter.replay();
mockSession.replay();
-
+
filter.filterWrite( nextFilter, session, writeRequest );
filter.messageSent( nextFilter, session, message );
-
+
/*
* Verify.
*/
mockNextFilter.verify();
mockSession.verify();
}
-
+
/**
* Tests when the contents of the stream fits into one write buffer.
*/
public void testWriteSingleBufferStream() throws Exception
{
StreamWriteFilter filter = new StreamWriteFilter();
-
+
byte[] data = new byte[] { 1, 2, 3, 4 };
-
+
InputStream stream = new ByteArrayInputStream( data );
WriteRequest writeRequest = new WriteRequest( stream, new DummyWriteFuture() );
-
+
/*
* Record expectations
*/
@@ -161,7 +162,7 @@
mockSession.setReturnValue(null);
nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( data ) ) );
mockNextFilter.setMatcher( new WriteRequestMatcher() );
-
+
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( stream );
session.removeAttribute( StreamWriteFilter.CURRENT_STREAM );
@@ -171,25 +172,25 @@
session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
mockSession.setReturnValue( null );
nextFilter.messageSent( session, stream );
-
+
/*
* Replay.
*/
mockNextFilter.replay();
mockSession.replay();
-
+
filter.filterWrite( nextFilter, session, writeRequest );
filter.messageSent( nextFilter, session, data );
-
+
/*
* Verify.
*/
mockNextFilter.verify();
mockSession.verify();
-
+
assertTrue( writeRequest.getFuture().isWritten() );
}
-
+
/**
* Tests when the contents of the stream doesn't fit into one write buffer.
*/
@@ -197,15 +198,15 @@
{
StreamWriteFilter filter = new StreamWriteFilter();
filter.setWriteBufferSize( 4 );
-
+
byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
byte[] chunk3 = new byte[] { 9, 10 };
-
+
InputStream stream = new ByteArrayInputStream( data );
WriteRequest writeRequest = new WriteRequest( stream, new DummyWriteFuture() );
-
+
/*
* Record expectations
*/
@@ -215,15 +216,15 @@
mockSession.setReturnValue(null);
nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( chunk1 ) ) );
mockNextFilter.setMatcher( new WriteRequestMatcher() );
-
+
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( stream );
nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( chunk2 ) ) );
-
+
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( stream );
nextFilter.filterWrite( session, new WriteRequest( ByteBuffer.wrap( chunk3 ) ) );
-
+
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( stream );
session.removeAttribute( StreamWriteFilter.CURRENT_STREAM );
@@ -233,34 +234,34 @@
session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
mockSession.setReturnValue( null );
nextFilter.messageSent( session, stream );
-
+
/*
* Replay.
*/
mockNextFilter.replay();
mockSession.replay();
-
+
filter.filterWrite( nextFilter, session, writeRequest );
filter.messageSent( nextFilter, session, chunk1 );
filter.messageSent( nextFilter, session, chunk2 );
filter.messageSent( nextFilter, session, chunk3 );
-
+
/*
* Verify.
*/
mockNextFilter.verify();
mockSession.verify();
-
+
assertTrue( writeRequest.getFuture().isWritten() );
}
-
+
public void testWriteWhileWriteInProgress() throws Exception
{
StreamWriteFilter filter = new StreamWriteFilter();
-
- Queue queue = new Queue();
+
+ Queue<? extends Object> queue = new LinkedList<Object>( );
InputStream stream = new ByteArrayInputStream( new byte[ 5 ] );
-
+
/*
* Record expectations
*/
@@ -269,7 +270,7 @@
mockSession.setReturnValue( stream );
session.getAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
mockSession.setReturnValue( queue );
-
+
/*
* Replay.
*/
@@ -279,35 +280,35 @@
WriteRequest wr = new WriteRequest( new Object(), new DummyWriteFuture() );
filter.filterWrite( nextFilter, session, wr );
assertEquals( 1, queue.size() );
- assertSame( wr, queue.pop() );
-
+ assertSame( wr, queue.poll() );
+
/*
* Verify.
*/
mockNextFilter.verify();
mockSession.verify();
}
-
+
public void testWritesWriteRequestQueueWhenFinished() throws Exception
{
StreamWriteFilter filter = new StreamWriteFilter();
- WriteRequest wrs[] = new WriteRequest[] {
+ WriteRequest wrs[] = new WriteRequest[] {
new WriteRequest( new Object(), new DummyWriteFuture() ),
new WriteRequest( new Object(), new DummyWriteFuture() ),
new WriteRequest( new Object(), new DummyWriteFuture() )
};
- Queue queue = new Queue();
- queue.push( wrs[ 0 ] );
- queue.push( wrs[ 1 ] );
- queue.push( wrs[ 2 ] );
+ Queue<WriteRequest> queue = new LinkedList<WriteRequest>( );
+ queue.add( wrs[ 0 ] );
+ queue.add( wrs[ 1 ] );
+ queue.add( wrs[ 2 ] );
InputStream stream = new ByteArrayInputStream( new byte[ 0 ] );
-
+
/*
* Record expectations
*/
mockSession.reset();
-
+
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( stream );
session.removeAttribute( StreamWriteFilter.CURRENT_STREAM );
@@ -316,7 +317,7 @@
mockSession.setReturnValue( new DefaultWriteFuture( session ) );
session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
mockSession.setReturnValue( queue );
-
+
nextFilter.filterWrite( session, wrs[ 0 ] );
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( null );
@@ -326,9 +327,9 @@
nextFilter.filterWrite( session, wrs[ 2 ] );
session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
mockSession.setReturnValue( null );
-
+
nextFilter.messageSent( session, stream );
-
+
/*
* Replay.
*/
@@ -337,14 +338,14 @@
filter.messageSent( nextFilter, session, new Object() );
assertEquals( 0, queue.size() );
-
+
/*
* Verify.
*/
mockNextFilter.verify();
mockSession.verify();
- }
-
+ }
+
/**
* Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
* specified size.
@@ -352,7 +353,7 @@
public void testSetWriteBufferSize() throws Exception
{
StreamWriteFilter filter = new StreamWriteFilter();
-
+
try
{
filter.setWriteBufferSize( 0 );
@@ -361,7 +362,7 @@
catch ( IllegalArgumentException iae )
{
}
-
+
try
{
filter.setWriteBufferSize( -100 );
@@ -376,7 +377,7 @@
filter.setWriteBufferSize( 1024 );
assertEquals( 1024, filter.getWriteBufferSize() );
}
-
+
public void testWriteUsingSocketTransport() throws Exception
{
IoAcceptor acceptor = new SocketAcceptor();
@@ -384,27 +385,27 @@
SocketAddress address = new InetSocketAddress( "localhost", AvailablePortFinder.getNextAvailable() );
IoConnector connector = new SocketConnector();
-
+
FixedRandomInputStream stream = new FixedRandomInputStream( 4 * 1024 * 1024 );
-
+
SenderHandler sender = new SenderHandler( stream );
ReceiverHandler receiver = new ReceiverHandler( stream.size );
-
+
acceptor.bind( address, sender );
-
+
synchronized( sender.lock )
{
synchronized( receiver.lock )
{
connector.connect( address, receiver );
-
+
sender.lock.wait();
receiver.lock.wait();
}
}
-
+
acceptor.unbind( address );
-
+
assertEquals( stream.bytesRead, receiver.bytesRead );
assertEquals( stream.size, receiver.bytesRead );
byte[] expectedMd5 = stream.digest.digest();
@@ -423,12 +424,13 @@
Random random = new Random();
MessageDigest digest;
- public FixedRandomInputStream( long size ) throws Exception
+ FixedRandomInputStream( long size ) throws Exception
{
this.size = size;
digest = MessageDigest.getInstance( "MD5" );
}
+ @Override
public int read() throws IOException
{
if ( isAllWritten() )
@@ -438,7 +440,7 @@
digest.update( b );
return b;
}
-
+
public long getBytesRead()
{
return bytesRead;
@@ -457,24 +459,27 @@
private static class SenderHandler extends IoHandlerAdapter
{
- Object lock = new Object();
+ final Object lock = new Object();
InputStream inputStream;
StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
- public SenderHandler( InputStream inputStream )
+ SenderHandler( InputStream inputStream )
{
this.inputStream = inputStream;
}
+ @Override
public void sessionCreated( IoSession session ) throws Exception {
super.sessionCreated( session );
session.getFilterChain().addLast( "codec", streamWriteFilter );
}
+ @Override
public void sessionOpened( IoSession session ) throws Exception {
session.write( inputStream );
}
+ @Override
public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
{
synchronized( lock )
@@ -483,6 +488,7 @@
}
}
+ @Override
public void sessionClosed( IoSession session ) throws Exception
{
synchronized( lock )
@@ -491,6 +497,7 @@
}
}
+ @Override
public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
{
synchronized( lock )
@@ -499,6 +506,7 @@
}
}
+ @Override
public void messageSent( IoSession session, Object message ) throws Exception
{
if( message == inputStream )
@@ -513,29 +521,32 @@
private static class ReceiverHandler extends IoHandlerAdapter
{
- Object lock = new Object();
+ final Object lock = new Object();
long bytesRead = 0;
long size = 0;
MessageDigest digest;
- public ReceiverHandler( long size ) throws Exception
+ ReceiverHandler( long size ) throws Exception
{
this.size = size;
digest = MessageDigest.getInstance( "MD5" );
}
+ @Override
public void sessionCreated( IoSession session ) throws Exception
{
super.sessionCreated(session);
-
+
session.setIdleTime( IdleStatus.READER_IDLE, 5 );
}
+ @Override
public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
{
session.close();
}
+ @Override
public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
{
synchronized( lock )
@@ -543,7 +554,8 @@
lock.notifyAll();
}
}
-
+
+ @Override
public void sessionClosed( IoSession session ) throws Exception
{
synchronized( lock )
@@ -552,6 +564,7 @@
}
}
+ @Override
public void messageReceived( IoSession session, Object message ) throws Exception
{
ByteBuffer buf = ( ByteBuffer ) message;
@@ -566,27 +579,28 @@
}
}
}
-
+
public static class WriteRequestMatcher extends AbstractMatcher
{
+ @Override
protected boolean argumentMatches( Object expected, Object actual )
{
- if( expected instanceof WriteRequest && expected instanceof WriteRequest )
+ if( expected instanceof WriteRequest && actual instanceof WriteRequest )
{
WriteRequest w1 = ( WriteRequest ) expected;
WriteRequest w2 = ( WriteRequest ) actual;
-
- return w1.getMessage().equals( w2.getMessage() )
+
+ return w1.getMessage().equals( w2.getMessage() )
&& w1.getFuture().isWritten() == w2.getFuture().isWritten();
}
return super.argumentMatches( expected, actual );
}
}
-
+
private static class DummyWriteFuture implements WriteFuture
{
private boolean written;
-
+
public boolean isWritten()
{
return written;
@@ -596,7 +610,7 @@
{
this.written = written;
}
-
+
public IoSession getSession()
{
return null;
Modified: directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java (original)
+++ directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java Sun Nov 5 11:36:41 2006
@@ -6,26 +6,27 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter.codec.textline;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
+import java.util.LinkedList;
+import java.util.Queue;
import junit.framework.Assert;
import junit.framework.TestCase;
-
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
@@ -36,7 +37,6 @@
import org.apache.mina.common.TransportType;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.util.Queue;
/**
* Tests {@link TextLineDecoder}.
@@ -56,19 +56,19 @@
TextLineDecoder decoder =
new TextLineDecoder(
Charset.forName( "UTF-8" ), LineDelimiter.WINDOWS );
-
+
CharsetEncoder encoder = Charset.forName( "UTF-8" ).newEncoder();
IoSession session = new DummySession();
TestDecoderOutput out = new TestDecoderOutput();
ByteBuffer in = ByteBuffer.allocate( 16 );
-
+
// Test one decode and one output
in.putString( "ABC\r\n", encoder );
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 1, out.getMessageQueue().size() );
- Assert.assertEquals( "ABC", out.getMessageQueue().pop() );
-
+ Assert.assertEquals( "ABC", out.getMessageQueue().poll() );
+
// Test two decode and one output
in.clear();
in.putString( "DEF", encoder );
@@ -80,17 +80,17 @@
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 1, out.getMessageQueue().size() );
- Assert.assertEquals( "DEFGHI", out.getMessageQueue().pop() );
-
+ Assert.assertEquals( "DEFGHI", out.getMessageQueue().poll() );
+
// Test one decode and two output
in.clear();
in.putString( "JKL\r\nMNO\r\n", encoder );
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 2, out.getMessageQueue().size() );
- Assert.assertEquals( "JKL", out.getMessageQueue().pop() );
- Assert.assertEquals( "MNO", out.getMessageQueue().pop() );
-
+ Assert.assertEquals( "JKL", out.getMessageQueue().poll() );
+ Assert.assertEquals( "MNO", out.getMessageQueue().poll() );
+
// Test splitted long delimiter
decoder = new TextLineDecoder(
Charset.forName( "UTF-8" ),
@@ -110,27 +110,27 @@
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 1, out.getMessageQueue().size() );
- Assert.assertEquals( "PQR", out.getMessageQueue().pop() );
+ Assert.assertEquals( "PQR", out.getMessageQueue().poll() );
}
-
+
public void testAutoDecode() throws Exception
{
TextLineDecoder decoder =
new TextLineDecoder(
Charset.forName( "UTF-8" ), LineDelimiter.AUTO );
-
+
CharsetEncoder encoder = Charset.forName( "UTF-8" ).newEncoder();
IoSession session = new DummySession();
TestDecoderOutput out = new TestDecoderOutput();
ByteBuffer in = ByteBuffer.allocate( 16 );
-
+
// Test one decode and one output
in.putString( "ABC\r\n", encoder );
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 1, out.getMessageQueue().size() );
- Assert.assertEquals( "ABC", out.getMessageQueue().pop() );
-
+ Assert.assertEquals( "ABC", out.getMessageQueue().poll() );
+
// Test two decode and one output
in.clear();
in.putString( "DEF", encoder );
@@ -142,27 +142,27 @@
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 1, out.getMessageQueue().size() );
- Assert.assertEquals( "DEFGHI", out.getMessageQueue().pop() );
-
+ Assert.assertEquals( "DEFGHI", out.getMessageQueue().poll() );
+
// Test one decode and two output
in.clear();
in.putString( "JKL\r\nMNO\r\n", encoder );
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 2, out.getMessageQueue().size() );
- Assert.assertEquals( "JKL", out.getMessageQueue().pop() );
- Assert.assertEquals( "MNO", out.getMessageQueue().pop() );
-
+ Assert.assertEquals( "JKL", out.getMessageQueue().poll() );
+ Assert.assertEquals( "MNO", out.getMessageQueue().poll() );
+
// Test multiple '\n's
in.clear();
in.putString( "\n\n\n", encoder );
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 3, out.getMessageQueue().size() );
- Assert.assertEquals( "", out.getMessageQueue().pop() );
- Assert.assertEquals( "", out.getMessageQueue().pop() );
- Assert.assertEquals( "", out.getMessageQueue().pop() );
-
+ Assert.assertEquals( "", out.getMessageQueue().poll() );
+ Assert.assertEquals( "", out.getMessageQueue().poll() );
+ Assert.assertEquals( "", out.getMessageQueue().poll() );
+
// Test splitted long delimiter (\r\r\n)
in.clear();
in.putString( "PQR\r", encoder );
@@ -179,11 +179,12 @@
in.flip();
decoder.decode( session, in, out );
Assert.assertEquals( 1, out.getMessageQueue().size() );
- Assert.assertEquals( "PQR", out.getMessageQueue().pop() );
+ Assert.assertEquals( "PQR", out.getMessageQueue().poll() );
}
-
+
private static class DummySession extends BaseIoSession
{
+ @Override
protected void updateTrafficMask()
{
}
@@ -192,7 +193,7 @@
{
return null;
}
-
+
public IoServiceConfig getServiceConfig()
{
return null;
@@ -243,17 +244,17 @@
return 0;
}
}
-
+
private static class TestDecoderOutput implements ProtocolDecoderOutput
{
- private Queue messageQueue = new Queue();
+ private Queue<Object> messageQueue = new LinkedList<Object>( );
public void write( Object message )
{
- messageQueue.push( message );
+ messageQueue.add( message );
}
-
- public Queue getMessageQueue()
+
+ public Queue<Object> getMessageQueue()
{
return messageQueue;
}
Modified: directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java (original)
+++ directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java Sun Nov 5 11:36:41 2006
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter.codec.textline;
@@ -58,15 +58,16 @@
SimpleProtocolEncoderOutput out =
new SimpleProtocolEncoderOutput()
{
+ @Override
protected WriteFuture doFlush( ByteBuffer buf )
{
return null;
}
};
-
+
encoder.encode( session, "ABC", out );
Assert.assertEquals( 1, out.getBufferQueue().size() );
- ByteBuffer buf = ( ByteBuffer ) out.getBufferQueue().pop();
+ ByteBuffer buf = out.getBufferQueue().remove(0);
Assert.assertEquals( 5, buf.remaining() );
Assert.assertEquals( 'A', buf.get() );
Assert.assertEquals( 'B', buf.get() );
@@ -77,6 +78,7 @@
private static class DummySession extends BaseIoSession
{
+ @Override
protected void updateTrafficMask()
{
}
Modified: directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java (original)
+++ directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java Sun Nov 5 11:36:41 2006
@@ -36,7 +36,7 @@
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
/**
- * A {@link MessageDecoder} that decodes {@link HttpRequest}.
+ * A {@link org.apache.mina.filter.codec.demux.MessageDecoder} that decodes {@link HttpRequest}.
*
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
Modified: directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java (original)
+++ directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java Sun Nov 5 11:36:41 2006
@@ -6,21 +6,22 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter.support;
import java.nio.ByteBuffer;
-
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
@@ -28,13 +29,12 @@
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLSession;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.IoFilter.NextFilter;
import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.support.DefaultWriteFuture;
import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.util.Queue;
import org.apache.mina.util.SessionLog;
/**
@@ -53,7 +53,7 @@
private final SSLFilter parent;
private final SSLContext ctx;
private final IoSession session;
- private final Queue scheduledWrites = new Queue();
+ private final Queue<ScheduledWrite> scheduledWrites = new ConcurrentLinkedQueue<ScheduledWrite>( );
private SSLEngine sslEngine;
@@ -88,12 +88,12 @@
private boolean initialHandshakeComplete;
private boolean writingEncryptedData;
-
+
/**
* Constuctor.
*
* @param sslc
- * @throws SSLException
+ * @throws SSLException
*/
public SSLHandler( SSLFilter parent, SSLContext sslc, IoSession session ) throws SSLException
{
@@ -122,21 +122,21 @@
{
sslEngine.setNeedClientAuth( true );
}
-
+
if( parent.getEnabledCipherSuites() != null )
{
sslEngine.setEnabledCipherSuites( parent.getEnabledCipherSuites() );
}
-
+
if( parent.getEnabledProtocols() != null )
{
sslEngine.setEnabledProtocols( parent.getEnabledProtocols() );
}
- sslEngine.beginHandshake();
+ sslEngine.beginHandshake();
initialHandshakeStatus = sslEngine.getHandshakeStatus();//SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
initialHandshakeComplete = false;
-
+
SSLByteBufferPool.initiate( sslEngine );
appBuffer = SSLByteBufferPool.getApplicationBuffer();
@@ -145,10 +145,10 @@
outNetBuffer = SSLByteBufferPool.getPacketBuffer();
outNetBuffer.position( 0 );
outNetBuffer.limit( 0 );
-
+
writingEncryptedData = false;
}
-
+
/**
* Release allocated ByteBuffers.
*/
@@ -171,7 +171,7 @@
"Unexpected exception from SSLEngine.closeInbound().",
e );
}
-
+
try
{
do
@@ -189,7 +189,7 @@
}
sslEngine.closeOutbound();
sslEngine = null;
-
+
SSLByteBufferPool.release( appBuffer );
SSLByteBufferPool.release( inNetBuffer );
SSLByteBufferPool.release( outNetBuffer );
@@ -200,7 +200,7 @@
{
return parent;
}
-
+
public IoSession getSession()
{
return session;
@@ -239,17 +239,17 @@
{
return ( initialHandshakeStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP && !isInboundDone() );
}
-
+
public void scheduleWrite( NextFilter nextFilter, WriteRequest writeRequest )
{
- scheduledWrites.push( new ScheduledWrite( nextFilter, writeRequest ) );
+ scheduledWrites.add( new ScheduledWrite( nextFilter, writeRequest ) );
}
-
+
public void flushScheduledWrites() throws SSLException
{
ScheduledWrite scheduledWrite;
-
- while( ( scheduledWrite = ( ScheduledWrite ) scheduledWrites.pop() ) != null )
+
+ while( ( scheduledWrite = scheduledWrites.poll() ) != null )
{
if( SessionLog.isDebugEnabled( session ) )
{
@@ -279,9 +279,9 @@
appBuffer.limit( 0 );
if( SessionLog.isDebugEnabled( session ) )
{
- SessionLog.debug( session,
+ SessionLog.debug( session,
" expanded inNetBuffer:" + inNetBuffer );
- SessionLog.debug( session,
+ SessionLog.debug( session,
" expanded appBuffer:" + appBuffer );
}
}
@@ -342,8 +342,6 @@
// buffer.
outNetBuffer.clear();
- SSLEngineResult result;
-
// Loop until there is no more data in src
while ( src.hasRemaining() ) {
@@ -357,7 +355,7 @@
}
}
- result = sslEngine.wrap( src, outNetBuffer );
+ SSLEngineResult result = sslEngine.wrap( src, outNetBuffer );
if ( SessionLog.isDebugEnabled( session ) ) {
SessionLog.debug( session, " Wrap res:" + result );
}
@@ -378,7 +376,7 @@
/**
* Start SSL shutdown process.
- *
+ *
* @return <tt>true</tt> if shutdown process is started.
* <tt>false</tt> if shutdown process is already finished.
*
@@ -390,7 +388,7 @@
{
return false;
}
-
+
sslEngine.closeOutbound();
// By RFC 2616, we can "fire and forget" our close_notify
@@ -444,10 +442,10 @@
status +
" inNetBuffer: " + inNetBuffer + "appBuffer: " + appBuffer);
}
-
+
return status;
}
-
+
/**
* Perform any handshaking processing.
*/
@@ -457,7 +455,7 @@
{
SessionLog.debug( session, " doHandshake()" );
}
-
+
while( !initialHandshakeComplete )
{
if( initialHandshakeStatus == SSLEngineResult.HandshakeStatus.FINISHED )
@@ -492,7 +490,7 @@
SessionLog.debug( session, " initialHandshakeStatus=NEED_UNWRAP" );
}
SSLEngineResult.Status status = unwrapHandshake();
- if( ( initialHandshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
+ if( ( initialHandshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
&& status == SSLEngineResult.Status.BUFFER_UNDERFLOW )
|| isInboundDone() )
{
@@ -543,15 +541,14 @@
// no; bail out
return DefaultWriteFuture.newNotWrittenFuture( session );
}
-
- WriteFuture writeFuture = null;
-
- // write net data
-
+
// set flag that we are writing encrypted data
// (used in SSLFilter.filterWrite())
writingEncryptedData = true;
-
+
+ // write net data
+ WriteFuture writeFuture = null;
+
try
{
if( SessionLog.isDebugEnabled( session ) )
@@ -564,7 +561,7 @@
SessionLog.debug( session, " session write: " + writeBuffer );
}
//debug("outNetBuffer (after copy): {0}", sslHandler.getOutNetBuffer());
-
+
writeFuture = new DefaultWriteFuture( session );
parent.filterWrite( nextFilter, session, new WriteRequest( writeBuffer, writeFuture ) );
@@ -598,18 +595,10 @@
{
writingEncryptedData = false;
}
-
- if( writeFuture != null )
- {
- return writeFuture;
- }
- else
- {
- return DefaultWriteFuture.newNotWrittenFuture( session );
- }
+
+ return writeFuture;
}
-
-
+
private SSLEngineResult.Status unwrap() throws SSLException
{
if( SessionLog.isDebugEnabled( session ) )
@@ -685,7 +674,7 @@
res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP );
initialHandshakeStatus = res.getHandshakeStatus();
-
+
// If handshake finished, no data was produced, and the status is still ok,
// try to unwrap more
if (initialHandshakeStatus == SSLEngineResult.HandshakeStatus.FINISHED
@@ -758,14 +747,14 @@
{
private final NextFilter nextFilter;
private final WriteRequest writeRequest;
-
- public ScheduledWrite( NextFilter nextFilter, WriteRequest writeRequest )
+
+ ScheduledWrite( NextFilter nextFilter, WriteRequest writeRequest )
{
this.nextFilter = nextFilter;
this.writeRequest = writeRequest;
}
}
-
+
/**
* Creates a new Mina byte buffer that is a deep copy of the remaining bytes
* in the given buffer (between index buf.position() and buf.limit())