You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/12 14:56:37 UTC

svn commit: r1764479 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ common/src/main/java/org/apache/qpid/bytebuffer/ common/src/mai...

Author: rgodfrey
Date: Wed Oct 12 14:56:37 2016
New Revision: 1764479

URL: http://svn.apache.org/viewvc?rev=1764479&view=rev
Log:
QPID-7447 : Small optimisations

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Wed Oct 12 14:56:37 2016
@@ -38,6 +38,7 @@ public class NetworkConnectionScheduler
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
     private final ThreadFactory _factory;
+    private final String _selectorThreadName;
     private volatile ThreadPoolExecutor _executor;
     private final AtomicInteger _running = new AtomicInteger();
     private final int _poolSize;
@@ -89,6 +90,7 @@ public class NetworkConnectionScheduler
         _threadKeepAliveTimeout = threadKeepAliveTimeout;
         _factory = factory;
         _numberOfSelectors = numberOfSelectors;
+        _selectorThreadName = "Selector-"+name;
     }
 
 
@@ -189,6 +191,11 @@ public class NetworkConnectionScheduler
         return _name;
     }
 
+    public String getSelectorThreadName()
+    {
+        return _selectorThreadName;
+    }
+
     public void addAcceptingSocket(final ServerSocketChannel serverSocket,
                                    final NonBlockingNetworkTransport nonBlockingNetworkTransport)
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Wed Oct 12 14:56:37 2016
@@ -280,7 +280,7 @@ class SelectorThread extends Thread
                         {
                             if (!_closed.get())
                             {
-                                Thread.currentThread().setName("Selector-" + _scheduler.getName());
+                                Thread.currentThread().setName(_scheduler.getSelectorThreadName());
                                 _inSelect.set(true);
                                 try
                                 {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java Wed Oct 12 14:56:37 2016
@@ -60,34 +60,70 @@ public class BrokerDecoder extends Serve
                 startTime = System.currentTimeMillis();
             }
             AMQChannel channel = _connection.getChannel(channelId);
-            if(channel == null)
+            if(channel != null)
             {
-                doProcessFrame(channelId, type, bodySize, in);
+                _connection.channelRequiresSync(channel);
             }
-            else
+            doProcessFrame(channelId, type, bodySize, in);
+
+        }
+        finally
+        {
+            if(_logger.isDebugEnabled())
             {
-                _connection.channelRequiresSync(channel);
+                _logger.debug("Frame handled in {} ms.", (System.currentTimeMillis() - startTime));
+            }
+        }
+    }
 
+    @Override
+    protected int processAMQPFrames(final QpidByteBuffer buf) throws AMQFrameDecodingException
+    {
+        int required = decodable(buf);
+        if (required == 0)
+        {
+            final int channelId = buf.getUnsignedShort(buf.position() + 1);
+            final AMQChannel channel = _connection.getChannel(channelId);
+
+            if (channel == null)
+            {
+                processInput(buf);
+                return 0;
+            }
+
+            else
+            {
                 try
                 {
-                    AccessController.doPrivileged(new PrivilegedExceptionAction<Object>()
+                    return AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>()
                     {
                         @Override
-                        public Void run() throws IOException, AMQFrameDecodingException
+                        public Integer run() throws IOException, AMQFrameDecodingException
                         {
-                            doProcessFrame(channelId, type, bodySize, in);
-                            return null;
+                            int required;
+                            while (true)
+                            {
+                                processInput(buf);
+
+                                required = decodable(buf);
+                                if (required != 0 || buf.getUnsignedShort(buf.position() + 1) != channelId)
+                                {
+                                    break;
+                                }
+                            }
+
+                            return required;
                         }
                     }, channel.getAccessControllerContext());
                 }
                 catch (PrivilegedActionException e)
                 {
                     Throwable cause = e.getCause();
-                    if(cause instanceof AMQFrameDecodingException)
+                    if (cause instanceof AMQFrameDecodingException)
                     {
                         throw (AMQFrameDecodingException) cause;
                     }
-                    else if(cause instanceof RuntimeException)
+                    else if (cause instanceof RuntimeException)
                     {
                         throw (RuntimeException) cause;
                     }
@@ -98,16 +134,9 @@ public class BrokerDecoder extends Serve
                 }
             }
         }
-        finally
-        {
-            if(_logger.isDebugEnabled())
-            {
-                _logger.debug("Frame handled in {} ms.", (System.currentTimeMillis() - startTime));
-            }
-        }
+        return required;
     }
 
-
     private void doProcessFrame(final int channelId, final byte type, final long bodySize, final QpidByteBuffer in)
             throws AMQFrameDecodingException
     {

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Wed Oct 12 14:56:37 2016
@@ -93,6 +93,12 @@ public class QpidByteBuffer
         return ((int) getShort()) & 0xffff;
     }
 
+    public final int getUnsignedShort(int pos)
+    {
+        return ((int) getShort(pos)) & 0xffff;
+    }
+
+
     public final long getUnsignedInt()
     {
         return ((long) getInt()) & 0xffffffffL;
@@ -380,7 +386,6 @@ public class QpidByteBuffer
     public final short getShort()
     {
         return _buffer.getShort();
-
     }
 
     public final float getFloat()

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Wed Oct 12 14:56:37 2016
@@ -108,11 +108,7 @@ public abstract class AMQDecoder<T exten
         {
             if(!_expectProtocolInitiation)
             {
-                required = decodable(buf);
-                if (required == 0)
-                {
-                    processInput(buf);
-                }
+                required = processAMQPFrames(buf);
             }
             else
             {
@@ -127,8 +123,17 @@ public abstract class AMQDecoder<T exten
         return buf.hasRemaining() ? required : 0;
     }
 
+    protected int processAMQPFrames(final QpidByteBuffer buf) throws AMQFrameDecodingException
+    {
+        final int required = decodable(buf);
+        if (required == 0)
+        {
+            processInput(buf);
+        }
+        return required;
+    }
 
-    private int decodable(final QpidByteBuffer in) throws AMQFrameDecodingException
+    protected int decodable(final QpidByteBuffer in) throws AMQFrameDecodingException
     {
         final int remainingAfterAttributes = in.remaining() - FRAME_HEADER_SIZE;
         // type, channel, body length and end byte
@@ -154,7 +159,7 @@ public abstract class AMQDecoder<T exten
 
     }
 
-    private void processInput(final QpidByteBuffer in)
+    protected void processInput(final QpidByteBuffer in)
             throws AMQFrameDecodingException, AMQProtocolVersionException
     {
         final byte type = in.get();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org