You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/09/01 22:46:14 UTC

svn commit: r1621883 - /qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java

Author: robbie
Date: Mon Sep  1 20:46:13 2014
New Revision: 1621883

URL: http://svn.apache.org/r1621883
Log:
QPIDJMS-25: move any 'write after read' work out of the netty IO thread, avoid them jumping application-initiated writes

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java?rev=1621883&r1=1621882&r2=1621883&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java Mon Sep  1 20:46:13 2014
@@ -40,6 +40,8 @@ import io.netty.channel.socket.nio.NioSo
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -60,6 +62,7 @@ public class AmqpConnectionDriverNetty
 
     private Bootstrap _bootstrap;
     private NettyHandler _nettyHandler;
+    private ExecutorService _executor;
 
     public AmqpConnectionDriverNetty()
     {
@@ -85,6 +88,7 @@ public class AmqpConnectionDriverNetty
 
         bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
 
+        _executor = Executors.newSingleThreadExecutor();
         _nettyHandler = new NettyHandler(amqpConnection);
         bootstrap.handler(new ChannelInitializer<SocketChannel>()
         {
@@ -113,6 +117,7 @@ public class AmqpConnectionDriverNetty
 
             throw new RuntimeException("Failed to connect", t);
         }
+
     }
 
     private class NettyHandler extends ChannelInboundHandlerAdapter
@@ -133,11 +138,21 @@ public class AmqpConnectionDriverNetty
         private class NettyWriter extends AbstractEventHandler
         {
             @Override
-            public void onTransport(Transport transport)
+            public void onTransport(final Transport transport)
             {
-                ChannelHandlerContext ctx = (ChannelHandlerContext) transport.getContext();
-                write(ctx);
-                scheduleReadIfCapacity(transport, ctx);
+                if(!_executor.isShutdown())
+                {
+                    _executor.execute(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            ChannelHandlerContext ctx = (ChannelHandlerContext) transport.getContext();
+                            write(ctx);
+                            scheduleReadIfCapacity(transport, ctx);
+                        }
+                    });
+                }
             }
 
 /*          This was disabled after adding the 'write after read' workaround for SASL negotiation in the channelRead(..) handler.
@@ -221,8 +236,18 @@ public class AmqpConnectionDriverNetty
 
                             //TODO: make SASL layer event-friendly to remove need for this?
                             //We may have output pending but with no transport events to dispatch"
-                            logMessage("Doing a write after processing AmqpConnection");
-                            write(ctx);
+                            if(!_executor.isShutdown())
+                            {
+                                _executor.execute(new Runnable()
+                                {
+                                    @Override
+                                    public void run()
+                                    {
+                                        logMessage("Doing a write after processing AmqpConnection");
+                                        write(ctx);
+                                    }
+                                });
+                            }
                         }
                     }
                     finally
@@ -333,7 +358,7 @@ public class AmqpConnectionDriverNetty
                                 @Override
                                 public void operationComplete(ChannelFuture chf)
                                 {
-                                    logMessage("Running write completion callback");
+                                    logMessage("Running write completion callback for bytes: " + size);
                                     if (chf.isSuccess())
                                     {
                                         synchronized (_amqpConnection)
@@ -341,8 +366,19 @@ public class AmqpConnectionDriverNetty
                                             _transport.pop(size);
                                             offset -= size;
                                         }
-                                        write(ctx);//TODO: fix. Calling this can cause us to fire channelInactive before reading any responses, see below
-                                        dispatch();
+
+                                        if(!_executor.isShutdown())
+                                        {
+                                            _executor.execute(new Runnable()
+                                            {
+                                                public void run()
+                                                {
+                                                    logMessage("Kicking off write completion tasks");
+                                                    write(ctx);//TODO: fix. Calling this can cause us to fire channelInactive before reading any responses, see below
+                                                    dispatch();
+                                                }
+                                            });
+                                        }
                                     }
                                     else
                                     {
@@ -407,6 +443,10 @@ public class AmqpConnectionDriverNetty
     public void stop() throws InterruptedException
     {
         // TODO perhaps close the context?
+        if(_executor != null)
+        {
+            _executor.shutdown();
+        }
     }
 
     private void logMessage(String message)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org