You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/09/01 22:46:14 UTC
svn commit: r1621883 -
/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
Author: robbie
Date: Mon Sep 1 20:46:13 2014
New Revision: 1621883
URL: http://svn.apache.org/r1621883
Log:
QPIDJMS-25: move any 'write after read' work out of the netty IO thread, avoid them jumping application-initiated writes
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java?rev=1621883&r1=1621882&r2=1621883&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java Mon Sep 1 20:46:13 2014
@@ -40,6 +40,8 @@ import io.netty.channel.socket.nio.NioSo
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -60,6 +62,7 @@ public class AmqpConnectionDriverNetty
private Bootstrap _bootstrap;
private NettyHandler _nettyHandler;
+ private ExecutorService _executor;
public AmqpConnectionDriverNetty()
{
@@ -85,6 +88,7 @@ public class AmqpConnectionDriverNetty
bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
+ _executor = Executors.newSingleThreadExecutor();
_nettyHandler = new NettyHandler(amqpConnection);
bootstrap.handler(new ChannelInitializer<SocketChannel>()
{
@@ -113,6 +117,7 @@ public class AmqpConnectionDriverNetty
throw new RuntimeException("Failed to connect", t);
}
+
}
private class NettyHandler extends ChannelInboundHandlerAdapter
@@ -133,11 +138,21 @@ public class AmqpConnectionDriverNetty
private class NettyWriter extends AbstractEventHandler
{
@Override
- public void onTransport(Transport transport)
+ public void onTransport(final Transport transport)
{
- ChannelHandlerContext ctx = (ChannelHandlerContext) transport.getContext();
- write(ctx);
- scheduleReadIfCapacity(transport, ctx);
+ if(!_executor.isShutdown())
+ {
+ _executor.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ ChannelHandlerContext ctx = (ChannelHandlerContext) transport.getContext();
+ write(ctx);
+ scheduleReadIfCapacity(transport, ctx);
+ }
+ });
+ }
}
/* This was disabled after adding the 'write after read' workaround for SASL negotiation in the channelRead(..) handler.
@@ -221,8 +236,18 @@ public class AmqpConnectionDriverNetty
//TODO: make SASL layer event-friendly to remove need for this?
//We may have output pending but with no transport events to dispatch"
- logMessage("Doing a write after processing AmqpConnection");
- write(ctx);
+ if(!_executor.isShutdown())
+ {
+ _executor.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ logMessage("Doing a write after processing AmqpConnection");
+ write(ctx);
+ }
+ });
+ }
}
}
finally
@@ -333,7 +358,7 @@ public class AmqpConnectionDriverNetty
@Override
public void operationComplete(ChannelFuture chf)
{
- logMessage("Running write completion callback");
+ logMessage("Running write completion callback for bytes: " + size);
if (chf.isSuccess())
{
synchronized (_amqpConnection)
@@ -341,8 +366,19 @@ public class AmqpConnectionDriverNetty
_transport.pop(size);
offset -= size;
}
- write(ctx);//TODO: fix. Calling this can cause us to fire channelInactive before reading any responses, see below
- dispatch();
+
+ if(!_executor.isShutdown())
+ {
+ _executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ logMessage("Kicking off write completion tasks");
+ write(ctx);//TODO: fix. Calling this can cause us to fire channelInactive before reading any responses, see below
+ dispatch();
+ }
+ });
+ }
}
else
{
@@ -407,6 +443,10 @@ public class AmqpConnectionDriverNetty
public void stop() throws InterruptedException
{
// TODO perhaps close the context?
+ if(_executor != null)
+ {
+ _executor.shutdown();
+ }
}
private void logMessage(String message)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org