You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2015/02/12 16:14:49 UTC
[3/5] activemq-6 git commit: ACTIVEMQ6-78 Improving performance over
Netty NIO blocked calls
ACTIVEMQ6-78 Improving performance over Netty NIO blocked calls
https://issues.apache.org/jira/browse/ACTIVEMQ6-78 performance work
There are two aspects of this work. First avoid asynchronous packets and avoid
context switch over the executors. Packet had a method to make certain packets such
as commit to use a different executor. Since it's NIO everything is done at the Netty thread now.
The second aspect was to make sure we use the proper buffering
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/f7c4d56c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/f7c4d56c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/f7c4d56c
Branch: refs/heads/master
Commit: f7c4d56cc771e2d4ff54fd2796ad2cdfae9f5e13
Parents: 41b28f4
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 10 11:30:18 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 12:47:14 2015 -0500
----------------------------------------------------------------------
.../core/buffers/impl/ChannelBufferWrapper.java | 12 +++++++
.../impl/ResetLimitWrappedActiveMQBuffer.java | 4 ++-
.../activemq/core/protocol/core/Packet.java | 2 --
.../impl/ActiveMQClientProtocolManager.java | 2 +-
.../core/protocol/core/impl/PacketImpl.java | 7 +---
.../core/impl/RemotingConnectionImpl.java | 37 ++------------------
.../core/impl/wireformat/RollbackMessage.java | 6 ----
.../impl/wireformat/SessionCloseMessage.java | 6 ----
.../impl/wireformat/SessionCommitMessage.java | 5 ---
.../impl/wireformat/SessionXACommitMessage.java | 6 ----
.../wireformat/SessionXAPrepareMessage.java | 6 ----
.../wireformat/SessionXARollbackMessage.java | 6 ----
.../remoting/impl/netty/NettyConnection.java | 6 ++--
.../remoting/impl/netty/NettyConnector.java | 3 +-
.../protocol/AbstractRemotingConnection.java | 4 +--
.../spi/core/protocol/RemotingConnection.java | 3 +-
.../activemq/spi/core/remoting/Connection.java | 2 +-
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../core/protocol/stomp/StompConnection.java | 2 +-
.../core/remoting/impl/invm/InVMConnection.java | 2 +-
.../impl/netty/NettyServerConnection.java | 2 +-
.../jms/tests/message/MessageHeaderTest.java | 4 +--
.../impl/netty/NettyConnectionTest.java | 2 +-
23 files changed, 35 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
index a3fa5b5..53d7306 100644
--- a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
+++ b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
@@ -35,6 +35,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer
protected ByteBuf buffer; // NO_UCD (use final)
private final boolean releasable;
+ public static ByteBuf unwrap(ByteBuf buffer)
+ {
+ ByteBuf parent;
+ while ((parent = buffer.unwrap()) != null &&
+ parent != buffer) // this last part is just in case the semantic
+ { // ever changes where unwrap is returning itself
+ buffer = parent;
+ }
+
+ return buffer;
+ }
+
public ChannelBufferWrapper(final ByteBuf buffer)
{
this(buffer, false);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 96b3e2b..1cd342d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -46,7 +46,9 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message)
{
- super(buffer.byteBuf());
+ // a wrapped inside a wrapper will increase the stack size.
+ // we fixed this here due to some profiling testing
+ super(unwrap(buffer.byteBuf()));
this.limit = limit;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
index 4027c67..6b23bff 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
@@ -85,6 +85,4 @@ public interface Packet
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();
-
- boolean isAsyncExec();
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index b7366df..890e8d0 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -482,7 +482,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
String handshake = "HORNETQ";
- ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length());
+ ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());
amqbuffer.writeBytes(handshake.getBytes());
transportConnection.write(amqbuffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
index e2bdc28..61e0eca 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
@@ -276,7 +276,7 @@ public class PacketImpl implements Packet
public ActiveMQBuffer encode(final RemotingConnection connection)
{
- ActiveMQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE);
+ ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
// The standard header fields
@@ -333,11 +333,6 @@ public class PacketImpl implements Packet
return true;
}
- public boolean isAsyncExec()
- {
- return false;
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
index 5ef4cfd..8820850 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -81,8 +81,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final Object failLock = new Object();
- private volatile boolean executing;
-
private final SimpleString nodeID;
private String clientID;
@@ -381,39 +379,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
}
- if (packet.isAsyncExec() && executor != null)
- {
- executing = true;
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- doBufferReceived(packet);
- }
- catch (Throwable t)
- {
- ActiveMQClientLogger.LOGGER.errorHandlingPacket(t, packet);
- }
-
- executing = false;
- }
- });
- }
- else
- {
- //To prevent out of order execution if interleaving sync and async operations on same connection
- while (executing)
- {
- Thread.yield();
- }
-
- // Pings must always be handled out of band so we can send pings back to the client quickly
- // otherwise they would get in the queue with everything else which might give an intolerable delay
- doBufferReceived(packet);
- }
+ dataReceived = true;
+ doBufferReceived(packet);
super.bufferReceived(connectionID, buffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
index 41c5735..340c73a 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
@@ -70,12 +70,6 @@ public class RollbackMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public int hashCode()
{
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
index 1c8a276..dc61860 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
@@ -49,10 +49,4 @@ public class SessionCloseMessage extends PacketImpl
// TODO
return 0;
}
-
- @Override
- public boolean isAsyncExec()
- {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
index c7242fb..1b1e081 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
@@ -30,9 +30,4 @@ public class SessionCommitMessage extends PacketImpl
super(SESS_COMMIT);
}
- @Override
- public boolean isAsyncExec()
- {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
index 65fdf33..14668cb 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
@@ -56,12 +56,6 @@ public class SessionXACommitMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public void encodeRest(final ActiveMQBuffer buffer)
{
XidCodecSupport.encodeXid(xid, buffer);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
index b9a531a..8ff5b15 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
@@ -62,12 +62,6 @@ public class SessionXAPrepareMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public int hashCode()
{
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
index 8efab01..272386d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
@@ -63,12 +63,6 @@ public class SessionXARollbackMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public int hashCode()
{
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
index a73aa1b..c7eafe7 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
@@ -172,9 +172,9 @@ public class NettyConnection implements Connection
listener.connectionDestroyed(getID());
}
- public ActiveMQBuffer createBuffer(final int size)
+ public ActiveMQBuffer createTransportBuffer(final int size)
{
- return new ChannelBufferWrapper(channel.alloc().buffer(size));
+ return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
}
public Object getID()
@@ -199,7 +199,7 @@ public class NettyConnection implements Connection
{
channel.writeAndFlush(batchBuffer.byteBuf());
- batchBuffer = createBuffer(BATCHING_BUFFER_SIZE);
+ batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
}
}
finally
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
index 3ed9e87..8425edd 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
@@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
@@ -477,7 +476,7 @@ public class NettyConnector extends AbstractConnector
}
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
- bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
final SSLContext context;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
index a48845f..22b26ee 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
@@ -182,9 +182,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection
closeListeners.addAll(listeners);
}
- public ActiveMQBuffer createBuffer(final int size)
+ public ActiveMQBuffer createTransportBuffer(final int size)
{
- return transportConnection.createBuffer(size);
+ return transportConnection.createTransportBuffer(size);
}
public Connection getTransportConnection()
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
index 186e098..9eb287e 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
@@ -115,11 +115,12 @@ public interface RemotingConnection extends BufferHandler
/**
* creates a new ActiveMQBuffer of the specified size.
+ * For the purpose of i/o outgoing packets
*
* @param size the size of buffer required
* @return the buffer
*/
- ActiveMQBuffer createBuffer(int size);
+ ActiveMQBuffer createTransportBuffer(int size);
/**
* called when the underlying connection fails.
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
index a4896e2..b6060d3 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
@@ -36,7 +36,7 @@ public interface Connection
* @param size the size of buffer to create
* @return the new buffer.
*/
- ActiveMQBuffer createBuffer(int size);
+ ActiveMQBuffer createTransportBuffer(int size);
RemotingConnection getProtocolConnection();
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
index 20762e2..bf906bc 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
@@ -440,7 +440,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
}
@Override
- public ActiveMQBuffer createBuffer(int size)
+ public ActiveMQBuffer createTransportBuffer(int size)
{
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
index 16cf55e..9a4e7b7 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
@@ -280,7 +280,7 @@ public final class StompConnection implements RemotingConnection
}
@Override
- public ActiveMQBuffer createBuffer(int size)
+ public ActiveMQBuffer createTransportBuffer(int size)
{
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
index eed1ff4..37e2acb 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
@@ -142,7 +142,7 @@ public class InVMConnection implements Connection
}
}
- public ActiveMQBuffer createBuffer(final int size)
+ public ActiveMQBuffer createTransportBuffer(final int size)
{
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
index d899e85..339b407 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
@@ -34,7 +34,7 @@ public class NettyServerConnection extends NettyConnection
}
@Override
- public ActiveMQBuffer createBuffer(int size)
+ public ActiveMQBuffer createTransportBuffer(int size)
{
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
index 784a0c0..cfbf995 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
@@ -1391,7 +1391,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
}
/* (non-Javadoc)
- * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(byte[])
+ * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(byte[])
*/
public ActiveMQBuffer createBuffer(final byte[] bytes)
{
@@ -1400,7 +1400,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
}
/* (non-Javadoc)
- * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(int)
+ * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(int)
*/
public ActiveMQBuffer createBuffer(final int size)
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index cc9d57b..b0a4c14 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -74,7 +74,7 @@ public class NettyConnectionTest extends UnitTestCase
final int size = 1234;
- ActiveMQBuffer buff = conn.createBuffer(size);
+ ActiveMQBuffer buff = conn.createTransportBuffer(size);
buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
Assert.assertEquals(size, buff.capacity());