You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/12/19 03:09:24 UTC
[3/4] activemq-artemis git commit: ARTEMIS-1546 Fixing body
compatibility issue by recast body towards 1.4
ARTEMIS-1546 Fixing body compatibility issue by recast body towards 1.4
https://issues.apache.org/jira/browse/ARTEMIS-1546
Recasting the body as 1.x format when there's a 1.x in use at the other size of the wire
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dbe575a0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dbe575a0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dbe575a0
Branch: refs/heads/master
Commit: dbe575a0c1a73849284f163e32544cbfe901c1f9
Parents: 9ef90f8
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Dec 15 16:36:46 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Dec 18 22:04:38 2017 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/ICoreMessage.java | 27 +++++++--
.../artemis/core/message/impl/CoreMessage.java | 58 ++++++++++++--------
.../core/protocol/ClientPacketDecoder.java | 16 ++++--
.../protocol/core/CoreRemotingConnection.java | 10 +++-
.../artemis/core/protocol/core/Packet.java | 3 +-
.../impl/ActiveMQClientProtocolManager.java | 1 +
.../core/impl/ActiveMQSessionContext.java | 43 +++++++++++----
.../core/protocol/core/impl/ChannelImpl.java | 2 +-
.../core/protocol/core/impl/PacketDecoder.java | 5 +-
.../core/protocol/core/impl/PacketImpl.java | 23 +++++---
.../core/impl/RemotingConnectionImpl.java | 18 +++---
.../impl/wireformat/SessionReceiveMessage.java | 11 +++-
.../wireformat/SessionReceiveMessage_1X.java | 49 +++++++++++++++++
.../impl/wireformat/SessionSendMessage.java | 8 ++-
.../impl/wireformat/SessionSendMessage_1X.java | 52 ++++++++++++++++++
.../jms/client/ActiveMQConnectionFactory.java | 2 +
.../artemis/jms/client/ActiveMQDestination.java | 4 +-
.../core/protocol/ServerPacketDecoder.java | 35 +++++++-----
.../core/ServerSessionPacketHandler.java | 10 ++--
.../core/impl/ActiveMQPacketHandler.java | 10 ++--
.../protocol/core/impl/CoreProtocolManager.java | 4 +-
.../protocol/core/impl/CoreSessionCallback.java | 8 ++-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../main/resources/servers/artemisServer.groovy | 7 +--
24 files changed, 303 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index 45622e4..179f8c5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.api.core;
import java.io.InputStream;
import java.util.Map;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -35,25 +36,33 @@ public interface ICoreMessage extends Message {
@Override
InputStream getBodyInputStream();
- /** Returns a new Buffer slicing the current Body. */
+ /**
+ * Returns a new Buffer slicing the current Body.
+ */
ActiveMQBuffer getReadOnlyBodyBuffer();
- /** Return the type of the message */
+ /**
+ * Return the type of the message
+ */
@Override
byte getType();
- /** the type of the message */
+ /**
+ * the type of the message
+ */
@Override
CoreMessage setType(byte type);
/**
* We are really interested if this is a LargeServerMessage.
+ *
* @return
*/
boolean isServerMessage();
/**
* The body used for this message.
+ *
* @return
*/
@Override
@@ -61,10 +70,18 @@ public interface ICoreMessage extends Message {
int getEndOfBodyPosition();
-
- /** Used on large messages treatment */
+ /**
+ * Used on large messages treatment
+ */
void copyHeadersAndProperties(Message msg);
+ void sendBuffer_1X(ByteBuf sendBuffer);
+
+ /**
+ * it will fix the body of incoming messages from 1.x and before versions
+ */
+ void receiveBuffer_1X(ByteBuf buffer);
+
/**
* @return Returns the message in Map form, useful when encoding to JSON
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 2500142..b0656b6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -188,8 +188,16 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public void receiveBuffer(ByteBuf buffer) {
this.buffer = buffer;
this.buffer.retain();
- decode();
- this.validBuffer = true;
+ decode(false);
+ }
+
+ /** This will fix the incoming body of 1.x messages */
+ @Override
+ public void receiveBuffer_1X(ByteBuf buffer) {
+ this.buffer = buffer;
+ this.buffer.retain();
+ decode(true);
+ validBuffer = false;
}
@Override
@@ -205,7 +213,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
/**
- *
* @param sendBuffer
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
*/
@@ -215,6 +222,21 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
}
+ /**
+ * Recast the message as an 1.4 message
+ */
+ @Override
+ public void sendBuffer_1X(ByteBuf sendBuffer) {
+ checkEncode();
+ ByteBuf tmpBuffer = buffer.duplicate();
+ sendBuffer.writeInt(endOfBodyPosition + DataConstants.SIZE_INT);
+ tmpBuffer.readerIndex(DataConstants.SIZE_INT);
+ tmpBuffer.readBytes(sendBuffer, endOfBodyPosition - BUFFER_HEADER_SPACE);
+ sendBuffer.writeInt(tmpBuffer.writerIndex() + DataConstants.SIZE_INT + BUFFER_HEADER_SPACE);
+ tmpBuffer.readBytes(sendBuffer, tmpBuffer.readableBytes());
+ sendBuffer.readerIndex(0);
+ }
+
private synchronized void checkEncode() {
if (!validBuffer) {
encode();
@@ -280,12 +302,10 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return endOfBodyPosition;
}
-
public TypedProperties getTypedProperties() {
return checkProperties();
}
-
@Override
public void messageChanged() {
validBuffer = false;
@@ -323,7 +343,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public void copyHeadersAndProperties(final Message msg) {
messageID = msg.getMessageID();
address = msg.getAddressSimpleString();
- userID = (UUID)msg.getUserID();
+ userID = (UUID) msg.getUserID();
type = msg.toCore().getType();
durable = msg.isDurable();
expiration = msg.getExpiration();
@@ -331,11 +351,10 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
priority = msg.getPriority();
if (msg instanceof CoreMessage) {
- properties = ((CoreMessage)msg).getTypedProperties();
+ properties = ((CoreMessage) msg).getTypedProperties();
}
}
-
@Override
public Message copy() {
checkEncode();
@@ -380,7 +399,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public CoreMessage setUserID(Object uuid) {
- this.userID = (UUID)uuid;
+ this.userID = (UUID) uuid;
return this;
}
@@ -418,7 +437,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return address;
}
-
@Override
public CoreMessage setExpiration(long expiration) {
this.expiration = expiration;
@@ -487,18 +505,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
- private void decode() {
+ private void decode(boolean beforeAddress) {
endOfBodyPosition = buffer.readInt();
buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
decodeHeadersAndProperties(buffer, true);
buffer.readerIndex(0);
+ validBuffer = true;
+
+ if (beforeAddress) {
+ endOfBodyPosition = endOfBodyPosition - DataConstants.SIZE_INT;
+ }
internalWritableBuffer();
}
-
public void decodeHeadersAndProperties(final ByteBuf buffer) {
decodeHeadersAndProperties(buffer, false);
}
@@ -529,7 +551,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
}
-
public synchronized CoreMessage encode() {
checkProperties();
@@ -654,7 +675,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
-
@Override
public CoreMessage putBooleanProperty(final String key, final boolean value) {
messageChanged();
@@ -683,7 +703,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return properties.getBooleanProperty(new SimpleString(key));
}
-
@Override
public CoreMessage putByteProperty(final SimpleString key, final byte value) {
messageChanged();
@@ -692,7 +711,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
-
@Override
public CoreMessage putByteProperty(final String key, final byte value) {
messageChanged();
@@ -702,7 +720,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
-
@Override
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties();
@@ -731,7 +748,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
-
@Override
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties();
@@ -775,7 +791,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
-
@Override
public CoreMessage putIntProperty(final SimpleString key, final int value) {
messageChanged();
@@ -803,7 +818,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return getIntProperty(SimpleString.toSimpleString(key));
}
-
@Override
public CoreMessage putLongProperty(final SimpleString key, final long value) {
messageChanged();
@@ -832,7 +846,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return getLongProperty(SimpleString.toSimpleString(key));
}
-
@Override
public CoreMessage putFloatProperty(final SimpleString key, final float value) {
messageChanged();
@@ -865,7 +878,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
-
@Override
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
messageChanged();
@@ -1071,7 +1083,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
int size = record.readInt();
initBuffer(size);
buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
- decode();
+ decode(false);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 206796d..1022030 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -19,10 +19,12 @@ package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
@@ -33,10 +35,10 @@ public class ClientPacketDecoder extends PacketDecoder {
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
@Override
- public Packet decode(final ActiveMQBuffer in) {
+ public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final byte packetType = in.readByte();
- Packet packet = decode(packetType);
+ Packet packet = decode(packetType, connection);
packet.decode(in);
@@ -44,12 +46,16 @@ public class ClientPacketDecoder extends PacketDecoder {
}
@Override
- public Packet decode(byte packetType) {
+ public Packet decode(byte packetType, CoreRemotingConnection connection) {
Packet packet;
switch (packetType) {
case SESS_RECEIVE_MSG: {
- packet = new SessionReceiveMessage(new ClientMessageImpl());
+ if (connection.isVersionBeforeAddressChange()) {
+ packet = new SessionReceiveMessage_1X(new ClientMessageImpl());
+ } else {
+ packet = new SessionReceiveMessage(new ClientMessageImpl());
+ }
break;
}
case SESS_RECEIVE_LARGE_MSG: {
@@ -57,7 +63,7 @@ public class ClientPacketDecoder extends PacketDecoder {
break;
}
default: {
- packet = super.decode(packetType);
+ packet = super.decode(packetType, connection);
}
}
return packet;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 1756153..b6a5d93 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -28,13 +29,18 @@ public interface CoreRemotingConnection extends RemotingConnection {
* The client protocol used on the communication.
* This will determine if the client has support for certain packet types
*/
- int getClientVersion();
+ int getChannelVersion();
+
+ default boolean isVersionBeforeAddressChange() {
+ int version = getChannelVersion();
+ return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
+ }
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
*/
- void setClientVersion(int clientVersion);
+ void setChannelVersion(int clientVersion);
/**
* Returns the channel with the channel id specified.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index efb9aa6..1f40314 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.core;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
/**
* A Packet represents a packet of data transmitted over a connection.
@@ -71,7 +70,7 @@ public interface Packet {
* @param connection the connection
* @return the buffer to encode to
*/
- ActiveMQBuffer encode(RemotingConnection connection);
+ ActiveMQBuffer encode(CoreRemotingConnection connection);
/**
* decodes the buffer into this packet
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 1ebc1d0..93432b8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -326,6 +326,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
}
}
while (retry);
+ sessionChannel.getConnection().setChannelVersion(response.getServerVersion());
return newSessionContext(name, confirmationWindowSize, sessionChannel, response);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index d0d75ac..a9c34f7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -83,7 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@@ -91,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -266,8 +267,14 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
- SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
- SessionQueueQueryResponseMessage_V3 response = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
+ SessionQueueQueryResponseMessage response;
+ if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
+ SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
+ response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
+ } else {
+ SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
+ response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
+ }
return response.toQueueQuery();
}
@@ -292,7 +299,13 @@ public class ActiveMQSessionContext extends SessionContext {
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
- SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
+ SessionQueueQueryResponseMessage queueInfo;
+
+ if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
+ queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
+ } else {
+ queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
+ }
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
@@ -441,7 +454,12 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException {
- SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
+ SessionSendMessage packet;
+ if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
+ packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
+ } else {
+ packet = new SessionSendMessage(msgI, sendBlocking, handler);
+ }
if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
@@ -596,7 +614,9 @@ public class ActiveMQSessionContext extends SessionContext {
Set<RoutingType> routingTypes,
final boolean autoCreated) throws ActiveMQException {
CreateAddressMessage request = new CreateAddressMessage(address, routingTypes, autoCreated, true);
- sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
+ if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) {
+ sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
+ }
}
@Deprecated
@@ -621,7 +641,9 @@ public class ActiveMQSessionContext extends SessionContext {
boolean purgeOnNoConsumers,
boolean autoCreated) throws ActiveMQException {
CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, routingType, filterString, durable, temp, maxConsumers, purgeOnNoConsumers, autoCreated, true);
- sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
+ if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) {
+ sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
+ }
}
@Override
@@ -695,11 +717,13 @@ public class ActiveMQSessionContext extends SessionContext {
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge) {
- return new CreateSessionMessage(name, sessionChannel.getID(), serverVersion, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
+ return new CreateSessionMessage(name, sessionChannel.getID(), getServerVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
}
@Override
- public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException {
+ public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal,
+ long consumerId,
+ boolean isSessionStarted) throws ActiveMQException {
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
// We try and recreate any non durable queues, since they probably won't be there unless
@@ -851,7 +875,6 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
-
class ClientSessionPacketHandler implements ChannelHandler {
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index b8049d2..4d73cf8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -152,7 +152,7 @@ public final class ChannelImpl implements Channel {
@Override
public boolean supports(final byte packetType) {
- return supports(packetType, connection.getClientVersion());
+ return supports(packetType, connection.getChannelVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index 2755b9a..5e46848 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverMessage;
@@ -160,9 +161,9 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUB
public abstract class PacketDecoder implements Serializable {
- public abstract Packet decode(ActiveMQBuffer in);
+ public abstract Packet decode(ActiveMQBuffer in, CoreRemotingConnection connection);
- public Packet decode(byte packetType) {
+ public Packet decode(byte packetType, CoreRemotingConnection connection) {
Packet packet;
switch (packetType) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 186a703..925d089 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -21,8 +21,8 @@ import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
public class PacketImpl implements Packet {
@@ -305,28 +305,35 @@ public class PacketImpl implements Packet {
}
@Override
- public ActiveMQBuffer encode(final RemotingConnection connection) {
+ public ActiveMQBuffer encode(final CoreRemotingConnection connection) {
ActiveMQBuffer buffer = createPacket(connection);
- // The standard header fields
+ encodeHeader(buffer);
+
+ encodeRest(buffer);
+
+ encodeSize(buffer);
+
+ return buffer;
+ }
+ protected void encodeHeader(ActiveMQBuffer buffer) {
+ // The standard header fields
buffer.writeInt(0); // The length gets filled in at the end
buffer.writeByte(type);
buffer.writeLong(channelID);
+ }
- encodeRest(buffer);
-
+ protected void encodeSize(ActiveMQBuffer buffer) {
size = buffer.writerIndex();
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
-
- return buffer;
}
- protected ActiveMQBuffer createPacket(RemotingConnection connection) {
+ protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
int size = expectedEncodeSize();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 17f96fb..ac73b57 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -64,7 +64,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final boolean client;
- private int clientVersion;
+ private int channelVersion;
private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
@@ -146,19 +146,19 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
}
/**
- * @return the clientVersion
+ * @return the channelVersion
*/
@Override
- public int getClientVersion() {
- return clientVersion;
+ public int getChannelVersion() {
+ return channelVersion;
}
/**
- * @param clientVersion the clientVersion to set
+ * @param clientVersion the channelVersion to set
*/
@Override
- public void setClientVersion(int clientVersion) {
- this.clientVersion = clientVersion;
+ public void setChannelVersion(int clientVersion) {
+ this.channelVersion = clientVersion;
}
@Override
@@ -362,7 +362,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
@Override
public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
try {
- final Packet packet = packetDecoder.decode(buffer);
+ final Packet packet = packetDecoder.decode(buffer, this);
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet);
@@ -417,7 +417,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
@Override
public void killMessage(SimpleString nodeID) {
- if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
+ if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
return;
}
Channel clientChannel = getChannel(1, -1);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index 4fbd48f..0a8d870 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -26,9 +27,9 @@ public class SessionReceiveMessage extends MessagePacket {
// Attributes ----------------------------------------------------
- private long consumerID;
+ protected long consumerID;
- private int deliveryCount;
+ protected int deliveryCount;
public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) {
super(SESS_RECEIVE_MSG, message);
@@ -69,13 +70,17 @@ public class SessionReceiveMessage extends MessagePacket {
public void decodeRest(final ActiveMQBuffer buffer) {
// Buffer comes in after having read standard headers and positioned at Beginning of body part
- message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
+ receiveMessage(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT);
this.consumerID = buffer.readLong();
this.deliveryCount = buffer.readInt();
+ }
+ protected void receiveMessage(ByteBuf buffer) {
+ message.receiveBuffer(buffer);
}
+
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java
new file mode 100644
index 0000000..2644af9
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class SessionReceiveMessage_1X extends SessionReceiveMessage {
+
+ public SessionReceiveMessage_1X(long consumerID, ICoreMessage message, int deliveryCount) {
+ super(consumerID, message, deliveryCount);
+ }
+
+ public SessionReceiveMessage_1X(CoreMessage message) {
+ super(message);
+ }
+
+ @Override
+ public void encodeRest(ActiveMQBuffer buffer) {
+ message.sendBuffer_1X(buffer.byteBuf());
+ buffer.writeLong(consumerID);
+ buffer.writeInt(deliveryCount);
+ }
+
+ @Override
+ protected void receiveMessage(ByteBuf buffer) {
+ message.receiveBuffer_1X(buffer);
+ }
+
+ @Override
+ public int expectedEncodeSize() {
+ return super.expectedEncodeSize() + DataConstants.SIZE_INT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index 9f76c2d..b56ae30 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
public class SessionSendMessage extends MessagePacket {
- private boolean requiresResponse;
+ protected boolean requiresResponse;
/**
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
@@ -76,7 +76,7 @@ public class SessionSendMessage extends MessagePacket {
// Buffer comes in after having read standard headers and positioned at Beginning of body part
ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
- message.receiveBuffer(messageBuffer);
+ receiveMessage(messageBuffer);
buffer.readerIndex(buffer.capacity() - 1);
@@ -84,6 +84,10 @@ public class SessionSendMessage extends MessagePacket {
}
+ protected void receiveMessage(ByteBuf messageBuffer) {
+ message.receiveBuffer(messageBuffer);
+ }
+
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java
new file mode 100644
index 0000000..fc91b41
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_1X.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+/**
+ * SessionSend Message for the 1.x branch
+ */
+public class SessionSendMessage_1X extends SessionSendMessage {
+
+ public SessionSendMessage_1X(ICoreMessage message, boolean requiresResponse, SendAcknowledgementHandler handler) {
+ super(message, requiresResponse, handler);
+ }
+
+ public SessionSendMessage_1X(CoreMessage message) {
+ super(message);
+ }
+
+ @Override
+ public void encodeRest(ActiveMQBuffer buffer) {
+ message.sendBuffer_1X(buffer.byteBuf());
+ buffer.writeBoolean(requiresResponse);
+ }
+
+ @Override
+ protected void receiveMessage(ByteBuf messageBuffer) {
+ message.receiveBuffer_1X(messageBuffer);
+ }
+
+ @Override
+ public int expectedEncodeSize() {
+ return super.expectedEncodeSize() + DataConstants.SIZE_INT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
index bd5fccf..ba5359a 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java
@@ -64,6 +64,8 @@ import org.apache.activemq.artemis.utils.ClassloadingUtil;
*/
public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Externalizable, Referenceable, ConnectionFactory, XAConnectionFactory, AutoCloseable {
+ private static final long serialVersionUID = -7554006056207377105L;
+
private ServerLocator serverLocator;
private String clientID;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 0bf4dd6..c0ab4b9 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -384,7 +384,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
public static boolean isQueue(TYPE type) {
boolean result = false;
- if (type.equals(QUEUE) || type.equals(TEMP_QUEUE)) {
+ if (type != null && (type.equals(QUEUE) || type.equals(TEMP_QUEUE))) {
result = true;
}
@@ -394,7 +394,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
public static boolean isTemporary(TYPE type) {
boolean result = false;
- if (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE)) {
+ if (type != null && (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE))) {
result = true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index bcbe633..0584476 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
@@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCon
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -84,51 +86,58 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
private static final long serialVersionUID = 3348673114388400766L;
public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
- private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in) {
- final SessionSendMessage sendMessage = new SessionSendMessage(new CoreMessage());
+ private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
+ final SessionSendMessage sendMessage;
+
+ if (connection.isVersionBeforeAddressChange()) {
+ sendMessage = new SessionSendMessage_1X(new CoreMessage());
+ } else {
+ sendMessage = new SessionSendMessage(new CoreMessage());
+ }
+
sendMessage.decode(in);
return sendMessage;
}
- private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in) {
+ private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage();
acknowledgeMessage.decode(in);
return acknowledgeMessage;
}
- private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in) {
+ private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionRequestProducerCreditsMessage requestProducerCreditsMessage = new SessionRequestProducerCreditsMessage();
requestProducerCreditsMessage.decode(in);
return requestProducerCreditsMessage;
}
- private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in) {
+ private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionConsumerFlowCreditMessage sessionConsumerFlowCreditMessage = new SessionConsumerFlowCreditMessage();
sessionConsumerFlowCreditMessage.decode(in);
return sessionConsumerFlowCreditMessage;
}
@Override
- public Packet decode(final ActiveMQBuffer in) {
+ public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final byte packetType = in.readByte();
//optimized for the most common cases: hottest and commons methods will be inlined and this::decode too due to the byte code size
switch (packetType) {
case SESS_SEND:
- return decodeSessionSendMessage(in);
+ return decodeSessionSendMessage(in, connection);
case SESS_ACKNOWLEDGE:
- return decodeSessionAcknowledgeMessage(in);
+ return decodeSessionAcknowledgeMessage(in, connection);
case SESS_PRODUCER_REQUEST_CREDITS:
- return decodeRequestProducerCreditsMessage(in);
+ return decodeRequestProducerCreditsMessage(in, connection);
case SESS_FLOWTOKEN:
- return decodeSessionConsumerFlowCreditMessage(in);
+ return decodeSessionConsumerFlowCreditMessage(in, connection);
default:
- return slowPathDecode(in, packetType);
+ return slowPathDecode(in, packetType, connection);
}
}
// separating for performance reasons
- private Packet slowPathDecode(ActiveMQBuffer in, byte packetType) {
+ private Packet slowPathDecode(ActiveMQBuffer in, byte packetType, CoreRemotingConnection connection) {
Packet packet;
switch (packetType) {
@@ -242,7 +251,7 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
break;
}
default: {
- packet = super.decode(packetType);
+ packet = super.decode(packetType, connection);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index e1e1b68..e93dd94 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -314,11 +314,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_CREATECONSUMER: {
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
requiresResponse = request.isRequiresResponse();
- session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getClientVersion()), request.getFilterString(), request.isBrowseOnly());
+ session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getChannelVersion()), request.getFilterString(), request.isBrowseOnly());
if (requiresResponse) {
// We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover
- QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
+ QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
@@ -387,9 +387,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_QUEUEQUERY: {
requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
- QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
+ QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
- if (remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
+ if (remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
}
@@ -405,7 +405,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_BINDINGQUERY: {
requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
- final int clientVersion = remotingConnection.getClientVersion();
+ final int clientVersion = remotingConnection.getChannelVersion();
BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion));
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 1b5a2a6..d38483a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -145,10 +145,10 @@ public class ActiveMQPacketHandler implements ChannelHandler {
"Server will not accept create session requests");
}*/
- if (connection.getClientVersion() == 0) {
- connection.setClientVersion(request.getVersion());
- } else if (connection.getClientVersion() != request.getVersion()) {
- ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getClientVersion());
+ if (connection.getChannelVersion() == 0) {
+ connection.setChannelVersion(request.getVersion());
+ } else if (connection.getChannelVersion() != request.getVersion()) {
+ ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getChannelVersion());
}
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
@@ -163,7 +163,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
- if (connection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
+ if (connection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
routingTypeMap = new HashMap<>();
routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST);
routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index 2cfd451..c9262fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -253,14 +253,14 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage) packet;
if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
- channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
+ channel0.getConnection().setChannelVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
}
final ClusterTopologyListener listener = new ClusterTopologyListener() {
@Override
public void nodeUP(final TopologyMember topologyMember, final boolean last) {
try {
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getClientVersion(), topologyMember);
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getChannelVersion(), topologyMember);
final String nodeID = topologyMember.getNodeId();
// Using an executor as most of the notifications on the Topology
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 92b3768..8b281eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -112,7 +113,12 @@ public final class CoreSessionCallback implements SessionCallback {
@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
- Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
+ Packet packet;
+ if (channel.getConnection().isVersionBeforeAddressChange()) {
+ packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount);
+ } else {
+ packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
+ }
int size = 0;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 36aa4e2..5dc1b93 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -237,7 +237,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (session.getRemotingConnection() instanceof CoreRemotingConnection) {
CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection();
- if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
+ if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
requiresLegacyPrefix = true;
if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) {
anycast = true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbe575a0/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
index 3ec6d31..1357986 100644
--- a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
+++ b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
@@ -53,9 +53,8 @@ server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
-// uncomment this line to validate https://issues.apache.org/jira/browse/ARTEMIS-1561
-// this api exists on both 1.4 and 2.x... so, this one was preferred for this
-if (producer.toString().startsWith("HORNETQ")) {
- // hornetq servers won't auto-create
+// uncomment this next statements to validate https://issues.apache.org/jira/browse/ARTEMIS-1561
+if (producer.toString().equals("ARTEMIS-140") && type.equals("ARTEMIS-SNAPSHOT") ||
+ producer.toString().startsWith("HORNETQ")) {
server.getJMSServerManager().createQueue(true, "queue", null, true);
}