You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/12 14:56:37 UTC
svn commit: r1764479 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
common/src/main/java/org/apache/qpid/bytebuffer/ common/src/mai...
Author: rgodfrey
Date: Wed Oct 12 14:56:37 2016
New Revision: 1764479
URL: http://svn.apache.org/viewvc?rev=1764479&view=rev
Log:
QPID-7447 : Small optimisations
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Wed Oct 12 14:56:37 2016
@@ -38,6 +38,7 @@ public class NetworkConnectionScheduler
{
private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
private final ThreadFactory _factory;
+ private final String _selectorThreadName;
private volatile ThreadPoolExecutor _executor;
private final AtomicInteger _running = new AtomicInteger();
private final int _poolSize;
@@ -89,6 +90,7 @@ public class NetworkConnectionScheduler
_threadKeepAliveTimeout = threadKeepAliveTimeout;
_factory = factory;
_numberOfSelectors = numberOfSelectors;
+ _selectorThreadName = "Selector-"+name;
}
@@ -189,6 +191,11 @@ public class NetworkConnectionScheduler
return _name;
}
+ public String getSelectorThreadName()
+ {
+ return _selectorThreadName;
+ }
+
public void addAcceptingSocket(final ServerSocketChannel serverSocket,
final NonBlockingNetworkTransport nonBlockingNetworkTransport)
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Wed Oct 12 14:56:37 2016
@@ -280,7 +280,7 @@ class SelectorThread extends Thread
{
if (!_closed.get())
{
- Thread.currentThread().setName("Selector-" + _scheduler.getName());
+ Thread.currentThread().setName(_scheduler.getSelectorThreadName());
_inSelect.set(true);
try
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java Wed Oct 12 14:56:37 2016
@@ -60,34 +60,70 @@ public class BrokerDecoder extends Serve
startTime = System.currentTimeMillis();
}
AMQChannel channel = _connection.getChannel(channelId);
- if(channel == null)
+ if(channel != null)
{
- doProcessFrame(channelId, type, bodySize, in);
+ _connection.channelRequiresSync(channel);
}
- else
+ doProcessFrame(channelId, type, bodySize, in);
+
+ }
+ finally
+ {
+ if(_logger.isDebugEnabled())
{
- _connection.channelRequiresSync(channel);
+ _logger.debug("Frame handled in {} ms.", (System.currentTimeMillis() - startTime));
+ }
+ }
+ }
+ @Override
+ protected int processAMQPFrames(final QpidByteBuffer buf) throws AMQFrameDecodingException
+ {
+ int required = decodable(buf);
+ if (required == 0)
+ {
+ final int channelId = buf.getUnsignedShort(buf.position() + 1);
+ final AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ processInput(buf);
+ return 0;
+ }
+
+ else
+ {
try
{
- AccessController.doPrivileged(new PrivilegedExceptionAction<Object>()
+ return AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>()
{
@Override
- public Void run() throws IOException, AMQFrameDecodingException
+ public Integer run() throws IOException, AMQFrameDecodingException
{
- doProcessFrame(channelId, type, bodySize, in);
- return null;
+ int required;
+ while (true)
+ {
+ processInput(buf);
+
+ required = decodable(buf);
+ if (required != 0 || buf.getUnsignedShort(buf.position() + 1) != channelId)
+ {
+ break;
+ }
+ }
+
+ return required;
}
}, channel.getAccessControllerContext());
}
catch (PrivilegedActionException e)
{
Throwable cause = e.getCause();
- if(cause instanceof AMQFrameDecodingException)
+ if (cause instanceof AMQFrameDecodingException)
{
throw (AMQFrameDecodingException) cause;
}
- else if(cause instanceof RuntimeException)
+ else if (cause instanceof RuntimeException)
{
throw (RuntimeException) cause;
}
@@ -98,16 +134,9 @@ public class BrokerDecoder extends Serve
}
}
}
- finally
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Frame handled in {} ms.", (System.currentTimeMillis() - startTime));
- }
- }
+ return required;
}
-
private void doProcessFrame(final int channelId, final byte type, final long bodySize, final QpidByteBuffer in)
throws AMQFrameDecodingException
{
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Wed Oct 12 14:56:37 2016
@@ -93,6 +93,12 @@ public class QpidByteBuffer
return ((int) getShort()) & 0xffff;
}
+ public final int getUnsignedShort(int pos)
+ {
+ return ((int) getShort(pos)) & 0xffff;
+ }
+
+
public final long getUnsignedInt()
{
return ((long) getInt()) & 0xffffffffL;
@@ -380,7 +386,6 @@ public class QpidByteBuffer
public final short getShort()
{
return _buffer.getShort();
-
}
public final float getFloat()
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1764479&r1=1764478&r2=1764479&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Wed Oct 12 14:56:37 2016
@@ -108,11 +108,7 @@ public abstract class AMQDecoder<T exten
{
if(!_expectProtocolInitiation)
{
- required = decodable(buf);
- if (required == 0)
- {
- processInput(buf);
- }
+ required = processAMQPFrames(buf);
}
else
{
@@ -127,8 +123,17 @@ public abstract class AMQDecoder<T exten
return buf.hasRemaining() ? required : 0;
}
+ protected int processAMQPFrames(final QpidByteBuffer buf) throws AMQFrameDecodingException
+ {
+ final int required = decodable(buf);
+ if (required == 0)
+ {
+ processInput(buf);
+ }
+ return required;
+ }
- private int decodable(final QpidByteBuffer in) throws AMQFrameDecodingException
+ protected int decodable(final QpidByteBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - FRAME_HEADER_SIZE;
// type, channel, body length and end byte
@@ -154,7 +159,7 @@ public abstract class AMQDecoder<T exten
}
- private void processInput(final QpidByteBuffer in)
+ protected void processInput(final QpidByteBuffer in)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org