You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/13 18:52:33 UTC

svn commit: r1617757 - /qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java

Author: robbie
Date: Wed Aug 13 16:52:33 2014
New Revision: 1617757

URL: http://svn.apache.org/r1617757
Log:
QPIDJMS-25: cleanup/fixups/formatting

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java?rev=1617757&r1=1617756&r2=1617757&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java Wed Aug 13 16:52:33 2014
@@ -38,37 +38,26 @@ import io.netty.channel.socket.SocketCha
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.demo.AbstractEventHandler;
 import org.apache.qpid.proton.demo.EventHandler;
 import org.apache.qpid.proton.demo.Events;
-import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
 
 public class AmqpConnectionDriverNetty extends AbstractEventHandler//TODO: HACK
 {
-    private static Logger _logger = Logger.getLogger(AmqpConnectionDriverNetty.class.getName());
-
-    private final ConcurrentHashMap<AmqpConnection,Boolean> _locallyUpdatedConnections =
-            new ConcurrentHashMap<AmqpConnection,Boolean>();
-//
-//    private DriverRunnable _driverRunnable;
-//    private Thread _driverThread;
+    //TODO: use or delete:
+    //private static Logger _logger = Logger.getLogger(AmqpConnectionDriverNetty.class.getName());
 
     private final Bootstrap _bootstrap;
     private AmqpConnection _amqpConnection;
@@ -85,17 +74,20 @@ public class AmqpConnectionDriverNetty e
 
     public AmqpConnectionDriverNetty() throws IOException
     {
+        //TODO: make config options configurable
+        int connectTimeoutMillis = 30000;
+        boolean autoRead = false;
+        boolean tcpNoDelay = true;
+        boolean tcpKeepAlive = true;
+        boolean tcpReuseAddr = true;
+
         Bootstrap bootstrap = new Bootstrap();
         bootstrap.channel(NioSocketChannel.class);
-
         EventLoopGroup group = new NioEventLoopGroup();
         bootstrap.group(group);
 
-        int connectTimeoutMillis = 30000;
-        boolean tcpKeepAlive = true;
-        boolean tcpNoDelay = true;
-        boolean tcpReuseAddr = true;
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
+        bootstrap.option(ChannelOption.AUTO_READ, autoRead);
         bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
         bootstrap.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
         bootstrap.option(ChannelOption.SO_REUSEADDR, tcpReuseAddr);
@@ -103,15 +95,12 @@ public class AmqpConnectionDriverNetty e
         bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
 
         _nettyHandler = new NettyHandler();
-        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-
+        bootstrap.handler(new ChannelInitializer<SocketChannel>()
+        {
             @Override
             protected void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 p.addLast(_nettyHandler);
-//                p.addLast(new BinaryMemcacheClientCodec());
-//                p.addLast(new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));
-//                p.addLast(new MemcacheClientHandler());
             }
         });
 
@@ -130,19 +119,12 @@ public class AmqpConnectionDriverNetty e
         ChannelFuture future = _bootstrap.connect(remoteHost, port);
         future.awaitUninterruptibly();
 
-        String threadName = null;
         if (future.isSuccess())
         {
             //TODO connected, do anything extra required (e.g wait for successful SSL handshake).
-            SocketAddress localAddress = future.channel().localAddress();
-            SocketAddress remoteAddress = future.channel().remoteAddress();
-
-            //TODO: delete?
-            threadName = "DriverRunnable-" + String.valueOf(localAddress) + "/" + String.valueOf(remoteAddress);
         }
         else
         {
-            //TODO: log it?
             Throwable t = future.cause();
 
             throw new RuntimeException("Failed to connect", t);
@@ -158,27 +140,35 @@ public class AmqpConnectionDriverNetty e
 
         private class NettyWriter extends AbstractEventHandler
         {
-//TODO:delete
-//            @Override
-//            public void onInit(Connection conn)
-//            {
-//                ChannelHandlerContext ctx = (ChannelHandlerContext) _transport.getContext();
-//                write(ctx);
-//                scheduleReadIfCapacity(_transport, ctx);
-//            }
-
             @Override
-            public void onTransport(Transport transport) {
+            public void onTransport(Transport transport)
+            {
                 ChannelHandlerContext ctx = (ChannelHandlerContext) transport.getContext();
                 write(ctx);
                 scheduleReadIfCapacity(transport, ctx);
             }
+
+            @Override
+            public void onFlow(Link link)
+            {
+                //TODO: delete, somehow.
+                //This is only here because the client currently sends its messages without checking for credit,
+                //which can result in the flow arriving after the send attempt, which results in no transport event
+                //being emitted to signal the message can now be written, and so it may never get sent.
+
+                logMessage("Forcing transport write attempt after flow");
+                ChannelHandlerContext ctx = (ChannelHandlerContext) _transport.getContext();
+                write(ctx);
+                scheduleReadIfCapacity(_transport, ctx);
+            }
         }
 
         @Override
-        public void channelActive(ChannelHandlerContext ctx) {
-            synchronized (_amqpConnection) {
-                System.out.println("ACTIVE");
+        public void channelActive(ChannelHandlerContext ctx)
+        {
+            synchronized (_amqpConnection)
+            {
+                logMessage("ACTIVE");
                 _transport = Transport.Factory.create();
                 _transport.setContext(ctx);
 
@@ -202,23 +192,23 @@ public class AmqpConnectionDriverNetty e
         }
 
         @Override
-        public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+        public void channelRead(final ChannelHandlerContext ctx, Object msg)
+        {
             synchronized (_amqpConnection)
             {
                 try
                 {
                     ByteBuf buf = (ByteBuf) msg;
 
-                    //TODO: delete
-                    ByteBuffer nio = buf.nioBuffer();
-                    byte[] bytes = new byte[nio.limit()];
-                    nio.get(bytes);
-                    System.out.println("Got Bytes: " + new Binary(bytes));
+                    echoBytes("Got Bytes: ", buf);
 
-                    try {
-                        while (buf.readableBytes() > 0) {
+                    try
+                    {
+                        while (buf.readableBytes() > 0)
+                        {
                             int capacity = _transport.capacity();
-                            if (capacity <= 0) {
+                            if (capacity <= 0)
+                            {
                                 throw new IllegalStateException("discarding bytes: " + buf.readableBytes());
                             }
                             ByteBuffer tail = _transport.tail();
@@ -229,7 +219,9 @@ public class AmqpConnectionDriverNetty e
                             processAmqpConnection();
                             dispatch();
                         }
-                    } finally {
+                    }
+                    finally
+                    {
                         buf.release();
                     }
 
@@ -243,47 +235,51 @@ public class AmqpConnectionDriverNetty e
         }
 
         @Override
-        public void channelInactive(ChannelHandlerContext ctx) {
-            synchronized (_amqpConnection) {
-                //System.out.println(String.format("CHANNEL CLOSED: settled %s, sent %s", settled, sent));
-                System.out.println("CHANNEL CLOSED");
+        public void channelInactive(ChannelHandlerContext ctx)
+        {
+            synchronized (_amqpConnection)
+            {
+                logMessage("CHANNEL CLOSED");
                 _transport.close_tail();
                 dispatch();
             }
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        {
+            logMessage("Exception caught: " + cause.getMessage());
             cause.printStackTrace();
             closeOnFlush(ctx.channel());
         }
 
-        /**
-         * Closes the specified channel after all queued write requests are flushed.
-         */
-        void closeOnFlush(Channel ch) {
-            if (ch.isActive()) {
+        void closeOnFlush(Channel ch)
+        {
+            if (ch.isActive())
+            {
                 ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
             }
         }
 
-
-
-
-        private void dispatch() {
-            System.out.println("Dispatch called");
-            synchronized (_amqpConnection) {
+        private void dispatch()
+        {
+            logMessage("Dispatch called");
+            synchronized (_amqpConnection)
+            {
                 try
                 {
-                    if (dispatching) {
-                        System.out.println("Dispatch skipped");
+                    if (dispatching)
+                    {
+                        logMessage("Dispatch skipped");
                         return;
                     }
-                    System.out.println("Dispatching");
+                    logMessage("Dispatching");
                     dispatching = true;
                     Event ev;
-                    while ((ev = _collector.peek()) != null) {
-                        for (EventHandler h : handlers) {
+                    while ((ev = _collector.peek()) != null)
+                    {
+                        for (EventHandler h : handlers)
+                        {
                             Events.dispatch(ev, h);
                             //TODO: delete
                             //processAmqpConnection();
@@ -305,19 +301,19 @@ public class AmqpConnectionDriverNetty e
 
         private void write(final ChannelHandlerContext ctx)
         {
-            System.out.println("Write called");
+            logMessage("Write called");
             synchronized (_amqpConnection)
             {
                 try
                 {
-                    System.out.println("Checking pending");
+                    logMessage("Checking pending");
                     int pending = _transport.pending();
 
-                    System.out.println("Pending:" + pending);
+                    logMessage("Pending:" + pending);
                     if (pending > 0)
                     {
                         final int size = pending - offset;
-                        System.out.println("Size:" + pending);
+                        logMessage("Size:" + pending);
                         if (size > 0)
                         {
                             ByteBuf buffer = Unpooled.buffer(size);
@@ -326,10 +322,7 @@ public class AmqpConnectionDriverNetty e
                             buffer.writeBytes(head);
 
                             //TODO: delete
-                            ByteBuffer nio = buffer.nioBuffer();
-                            byte[] bytes = new byte[nio.limit()];
-                            nio.get(bytes);
-                            System.out.println("Sending Bytes: " + new Binary(bytes));
+                            echoBytes("Sending Bytes: ", buffer);
 
                             ChannelFuture chf = ctx.writeAndFlush(buffer);
                             offset += size;
@@ -338,7 +331,7 @@ public class AmqpConnectionDriverNetty e
                                 @Override
                                 public void operationComplete(ChannelFuture chf)
                                 {
-                                    System.out.println("In completion callback");
+                                    logMessage("Running write completion callback");
                                     if (chf.isSuccess())
                                     {
                                         synchronized (_amqpConnection)
@@ -356,7 +349,7 @@ public class AmqpConnectionDriverNetty e
                                                 _amqpConnection.notifyAll();
                                             }
                                         }
-                                        write(ctx);
+                                        write(ctx);//TODO: fix. Calling this can cause us to fire channelInactive before reading any responses, see below
                                         dispatch();
                                     }
                                     else
@@ -365,6 +358,7 @@ public class AmqpConnectionDriverNetty e
                                     }
                                 }
                             });
+                            logMessage("Write completion callback added");
                         }
                         else
                         {
@@ -375,6 +369,7 @@ public class AmqpConnectionDriverNetty e
                     {
                         if (pending < 0)
                         {
+                            //TODO: fix. Calling this can cause us to fire channelInactive before reading any responses
                             //closeOnFlush(ctx.channel());
                         }
                         return;
@@ -389,19 +384,19 @@ public class AmqpConnectionDriverNetty e
 
         private void scheduleReadIfCapacity(Transport transport, ChannelHandlerContext ctx)
         {
-            System.out.println("Checking if read can be scheduled");
+            logMessage("Checking if read can be scheduled");
             int capacity = transport.capacity();
             if (capacity > 0)
             {
-                System.out.println("Scheduling read");
+                logMessage("Scheduling read");
                 ctx.read();
-                System.out.println("Scheduled read");
+                logMessage("Scheduled read");
             }
         }
 
         private void processAmqpConnection()
         {
-            System.out.println("Processing AmqpConnection");
+            logMessage("Processing AmqpConnection");
             _amqpConnection.process();
         }
     }
@@ -413,10 +408,19 @@ public class AmqpConnectionDriverNetty e
             @Override
             public void run()
             {
-                //TODO: this is a hack
-                System.out.println("Writing From Executor");
+                //TODO: this is a hack to somewhat replicate how the previous
+                //driver worked, until the higher layer code is rewritten not to need it
+                logMessage("Lazy Writing From Executor");
+//                try
+//                {
+//                    Thread.sleep(100);
+//                }
+//                catch (InterruptedException e)
+//                {
+//                    //ignore
+//                }
                 _nettyHandler.write((ChannelHandlerContext) _nettyHandler._transport.getContext());
-               // _nettyHandler.dispatch();
+               //TODO:delete: _nettyHandler.dispatch();
             }
         });
     }
@@ -425,4 +429,20 @@ public class AmqpConnectionDriverNetty e
     {
         // TODO Auto-generated method stub
     }
+
+    private void logMessage(String message)
+    {
+        String name = Thread.currentThread().getName();
+        System.out.println("[" + name + "] " + message);
+    }
+
+    private void echoBytes(String msgPrefix, ByteBuf buf)
+    {
+        //TODO: delete this method
+        ByteBuffer nio = buf.nioBuffer();
+        byte[] bytes = new byte[nio.limit()];
+        nio.get(bytes);
+
+        logMessage(msgPrefix + new Binary(bytes));
+    }
 }
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org