You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/27 21:29:52 UTC
[1/4] activemq-artemis git commit: This closes #2187
Repository: activemq-artemis
Updated Branches:
refs/heads/master 515a2e064 -> 94be09686
This closes #2187
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/94be0968
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/94be0968
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/94be0968
Branch: refs/heads/master
Commit: 94be096861f1baa12180d8375a60f4bcb1d9792c
Parents: 515a2e0 a28b4fb
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 27 17:29:18 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 27 17:29:18 2018 -0400
----------------------------------------------------------------------
.../core/client/SendAcknowledgementHandler.java | 9 +
.../client/ActiveMQClientMessageBundle.java | 3 +
.../artemis/core/protocol/core/Channel.java | 3 +
.../protocol/core/CoreRemotingConnection.java | 5 +
.../artemis/core/protocol/core/Packet.java | 8 +
.../core/protocol/core/ResponseHandler.java | 30 ++
.../core/impl/ActiveMQSessionContext.java | 71 ++-
.../core/protocol/core/impl/ChannelImpl.java | 67 ++-
.../core/protocol/core/impl/PacketDecoder.java | 34 +-
.../core/protocol/core/impl/PacketImpl.java | 26 +-
.../core/protocol/core/impl/ResponseCache.java | 74 +++
.../wireformat/ActiveMQExceptionMessage.java | 2 +-
.../wireformat/ActiveMQExceptionMessage_V2.java | 101 ++++
.../impl/wireformat/CreateAddressMessage.java | 1 +
.../impl/wireformat/CreateQueueMessage.java | 1 +
.../wireformat/CreateSharedQueueMessage.java | 1 +
.../impl/wireformat/NullResponseMessage_V2.java | 96 ++++
.../wireformat/SessionAcknowledgeMessage.java | 1 +
.../SessionCreateConsumerMessage.java | 1 +
.../SessionIndividualAcknowledgeMessage.java | 1 +
.../SessionSendContinuationMessage.java | 31 +-
.../SessionSendContinuationMessage_V2.java | 122 +++++
.../impl/wireformat/SessionSendMessage.java | 27 +-
.../impl/wireformat/SessionSendMessage_V2.java | 104 ++++
.../wireformat/SessionXAResponseMessage.java | 6 +-
.../wireformat/SessionXAResponseMessage_V2.java | 102 ++++
.../main/resources/activemq-version.properties | 2 +-
.../protocol/core/impl/ChannelImplTest.java | 512 +++++++++++++++++++
.../jms/client/ActiveMQMessageProducer.java | 76 ++-
.../core/protocol/ServerPacketDecoder.java | 5 +-
.../core/ServerSessionPacketHandler.java | 176 ++++---
pom.xml | 2 +-
.../cluster/util/BackupSyncDelay.java | 6 +
.../JmsProducerCompletionListenerTest.java | 19 +-
.../artemis/jms/tests/SecurityTest.java | 189 ++++++-
tests/jms-tests/src/test/resources/broker.xml | 10 +
36 files changed, 1798 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
[4/4] activemq-artemis git commit: ARTEMIS-1545 Support JMS 2.0
Completion Listener for Exceptions
Posted by cl...@apache.org.
ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e4ba48a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e4ba48a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e4ba48a3
Branch: refs/heads/master
Commit: e4ba48a31193ac532404d93b37f29d2720f1a863
Parents: 515a2e0
Author: Michael André Pearce <mi...@me.com>
Authored: Thu Dec 14 07:47:30 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 27 17:29:18 2018 -0400
----------------------------------------------------------------------
.../core/client/SendAcknowledgementHandler.java | 7 +
.../client/ActiveMQClientMessageBundle.java | 3 +
.../artemis/core/protocol/core/Channel.java | 3 +
.../protocol/core/CoreRemotingConnection.java | 5 +
.../artemis/core/protocol/core/Packet.java | 8 +
.../core/protocol/core/ResponseHandler.java | 30 ++++
.../core/impl/ActiveMQSessionContext.java | 78 +++++++--
.../core/protocol/core/impl/ChannelImpl.java | 58 ++++++-
.../core/protocol/core/impl/PacketDecoder.java | 34 +++-
.../core/protocol/core/impl/PacketImpl.java | 21 +++
.../core/protocol/core/impl/ResponseCache.java | 70 ++++++++
.../wireformat/ActiveMQExceptionMessage.java | 2 +-
.../wireformat/ActiveMQExceptionMessage_V2.java | 101 +++++++++++
.../impl/wireformat/CreateAddressMessage.java | 1 +
.../impl/wireformat/CreateQueueMessage.java | 1 +
.../wireformat/CreateSharedQueueMessage.java | 1 +
.../impl/wireformat/NullResponseMessage_V2.java | 96 ++++++++++
.../wireformat/SessionAcknowledgeMessage.java | 1 +
.../SessionCreateConsumerMessage.java | 1 +
.../SessionIndividualAcknowledgeMessage.java | 1 +
.../SessionSendContinuationMessage.java | 31 +++-
.../SessionSendContinuationMessage_V2.java | 122 +++++++++++++
.../impl/wireformat/SessionSendMessage.java | 27 ++-
.../impl/wireformat/SessionSendMessage_V2.java | 104 +++++++++++
.../wireformat/SessionXAResponseMessage.java | 6 +-
.../wireformat/SessionXAResponseMessage_V2.java | 102 +++++++++++
.../main/resources/activemq-version.properties | 2 +-
.../jms/client/ActiveMQMessageProducer.java | 30 ++++
.../core/protocol/ServerPacketDecoder.java | 5 +-
.../core/ServerSessionPacketHandler.java | 173 ++++++++++++-------
pom.xml | 2 +-
.../cluster/util/BackupSyncDelay.java | 6 +
.../JmsProducerCompletionListenerTest.java | 19 +-
.../artemis/jms/tests/SecurityTest.java | 82 ++++++++-
tests/jms-tests/src/test/resources/broker.xml | 10 ++
35 files changed, 1134 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
index c164f6c..0f47536 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
@@ -41,4 +41,11 @@ public interface SendAcknowledgementHandler {
* @param message message sent asynchronously
*/
void sendAcknowledged(Message message);
+
+ default void sendFailed(Message message, Exception e) {
+ //This is to keep old behaviour that would ack even if error,
+ // if anyone custom implemented this interface but doesnt update.
+ sendAcknowledged(message);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
index bb88e6d..e043ac9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
@@ -228,4 +228,7 @@ public interface ActiveMQClientMessageBundle {
@Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
ActiveMQInterruptedException packetTransmissionInterrupted();
+
+ @Message(id = 119063, value = "Cannot send a packet while response cache is full.")
+ IllegalStateException cannotSendPacketWhilstResponseCacheFull();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 127a69a..56f8259 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -211,6 +211,9 @@ public interface Channel {
*/
void setCommandConfirmationHandler(CommandConfirmationHandler handler);
+ void setResponseHandler(ResponseHandler handler);
+
+
/**
* flushes any confirmations on to the connection.
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index b6a5d93..74d9847 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -36,6 +36,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
}
+ default boolean isVersionBeforeAsyncResponseChange() {
+ int version = getChannelVersion();
+ return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
+ }
+
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index 1f40314..b658090 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -41,6 +41,14 @@ public interface Packet {
return INITIAL_PACKET_SIZE;
}
+ boolean isRequiresResponse();
+
+ boolean isResponseAsync();
+
+ long getCorrelationID();
+
+ void setCorrelationID(long correlationID);
+
/**
* Returns the channel id of the channel that should handle this packet.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
new file mode 100644
index 0000000..21e9879
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core;
+
+/**
+ * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
+ */
+public interface ResponseHandler {
+
+ /**
+ * called by channel after a confirmation has been received.
+ *
+ * @param packet the packet confirmed
+ */
+ void responseHandler(Packet packet, Packet response);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 18227cb..3c0647f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -99,9 +100,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -168,7 +171,11 @@ public class ActiveMQSessionContext extends SessionContext {
sessionChannel.setHandler(handler);
if (confirmationWindow >= 0) {
- sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+ if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+ sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
+ } else {
+ sessionChannel.setResponseHandler(responseHandler);
+ }
}
}
@@ -185,28 +192,50 @@ public class ActiveMQSessionContext extends SessionContext {
this.killed = true;
}
- private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
+ private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
+ @Override
+ public void commandConfirmed(Packet packet) {
+ responseHandler.responseHandler(packet, null);
+ }
+ };
+
+ private final ResponseHandler responseHandler = new ResponseHandler() {
@Override
- public void commandConfirmed(final Packet packet) {
+ public void responseHandler(Packet packet, Packet response) {
+ final ActiveMQException activeMQException;
+ if (response != null && response.getType() == PacketImpl.EXCEPTION) {
+ ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
+ activeMQException = exceptionResponseMessage.getException();
+ } else {
+ activeMQException = null;
+ }
+
if (packet.getType() == PacketImpl.SESS_SEND) {
SessionSendMessage ssm = (SessionSendMessage) packet;
- callSendAck(ssm.getHandler(), ssm.getMessage());
+ callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);
} else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
if (!scm.isContinues()) {
- callSendAck(scm.getHandler(), scm.getMessage());
+ callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);
}
}
}
- private void callSendAck(SendAcknowledgementHandler handler, final Message message) {
+ private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {
if (handler != null) {
- handler.sendAcknowledged(message);
+ if (exception == null) {
+ handler.sendAcknowledged(message);
+ } else {
+ handler.sendFailed(message, exception);
+ }
} else if (sendAckHandler != null) {
- sendAckHandler.sendAcknowledged(message);
+ if (exception == null) {
+ sendAckHandler.sendAcknowledged(message);
+ } else {
+ handler.sendFailed(message, exception);
+ }
}
}
-
};
// Failover utility methods
@@ -243,7 +272,11 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
- sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+ if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+ sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
+ } else {
+ sessionChannel.setResponseHandler(responseHandler);
+ }
this.sendAckHandler = handler;
}
@@ -472,13 +505,15 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException {
- SessionSendMessage packet;
+ final SessionSendMessage packet;
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
- } else {
+ } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendMessage(msgI, sendBlocking, handler);
+ } else {
+ boolean responseRequired = confirmationWindow != -1 || sendBlocking;
+ packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
}
-
if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
} else {
@@ -904,15 +939,20 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
- private static int sendSessionSendContinuationMessage(Channel channel,
+ private int sendSessionSendContinuationMessage(Channel channel,
Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
- final boolean requiresResponse = lastChunk && sendBlocking;
- final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+ final boolean requiresResponse = lastChunk || confirmationWindow != -1;
+ final SessionSendContinuationMessage chunkPacket;
+ if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+ chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+ } else {
+ chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+ }
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();
@@ -929,7 +969,11 @@ public class ActiveMQSessionContext extends SessionContext {
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+ if (sendBlocking) {
+ channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+ } else {
+ channel.send(chunkPacket);
+ }
} else {
channel.send(chunkPacket);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 4d73cf8..9cb2a83 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -96,6 +97,8 @@ public final class ChannelImpl implements Channel {
private final java.util.Queue<Packet> resendCache;
+ private final ResponseCache responseAsyncCache;
+
private int firstStoredCommandID;
private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
@@ -138,8 +141,10 @@ public final class ChannelImpl implements Channel {
if (confWindowSize != -1) {
resendCache = new ConcurrentLinkedQueue<>();
+ responseAsyncCache = new ResponseCache();
} else {
resendCache = null;
+ responseAsyncCache = null;
}
this.interceptors = interceptors;
@@ -211,7 +216,11 @@ public final class ChannelImpl implements Channel {
lock.lock();
try {
- response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause));
+ ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
+ if (responseAsyncCache != null) {
+ responseAsyncCache.errorAll(activeMQException);
+ }
+ response = new ActiveMQExceptionMessage(activeMQException);
sendCondition.signal();
} finally {
@@ -270,6 +279,10 @@ public final class ChannelImpl implements Channel {
synchronized (sendLock) {
packet.setChannelID(id);
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
+ }
+
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}
@@ -291,6 +304,7 @@ public final class ChannelImpl implements Channel {
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
+
} finally {
lock.unlock();
}
@@ -301,9 +315,30 @@ public final class ChannelImpl implements Channel {
checkReconnectID(reconnectID);
+ //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
+ //As the send could block if the response cache is cannot add, preventing responses to be handled.
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ while (!responseAsyncCache.add(packet)) {
+ try {
+ Thread.sleep(1);
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ }
+
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
- connection.getTransportConnection().write(buffer, flush, batch);
+ try {
+ connection.getTransportConnection().write(buffer, flush, batch);
+ } catch (Throwable t) {
+ //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
+ //The client would get still know about this as the exception bubbles up the call stack instead.
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ responseAsyncCache.remove(packet.getCorrelationID());
+ }
+ throw t;
+ }
return true;
}
}
@@ -478,6 +513,18 @@ public final class ChannelImpl implements Channel {
}
@Override
+ public void setResponseHandler(final ResponseHandler responseHandler) {
+ if (confWindowSize < 0) {
+ final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information.";
+ if (logger.isTraceEnabled()) {
+ logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg);
+ }
+ throw new IllegalStateException(msg);
+ }
+ responseAsyncCache.setResponseHandler(responseHandler);
+ }
+
+ @Override
public void setHandler(final ChannelHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Setting handler on " + this + " as " + handler);
@@ -595,6 +642,12 @@ public final class ChannelImpl implements Channel {
}
}
+ public void handleResponse(Packet packet) {
+ if (responseAsyncCache != null && packet.isResponseAsync()) {
+ responseAsyncCache.handleResponse(packet);
+ }
+ }
+
@Override
public void confirm(final Packet packet) {
if (resendCache != null && packet.isRequiresConfirmations()) {
@@ -647,6 +700,7 @@ public final class ChannelImpl implements Channel {
if (packet.isResponse()) {
confirm(packet);
+ handleResponse(packet);
lock.lock();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index 5e46848..9a8166e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
@@ -71,6 +72,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -81,6 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
@@ -88,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
@@ -184,13 +188,25 @@ public abstract class PacketDecoder implements Serializable {
break;
}
case EXCEPTION: {
- packet = new ActiveMQExceptionMessage();
+ if (connection.isVersionBeforeAsyncResponseChange()) {
+ packet = new ActiveMQExceptionMessage();
+ } else {
+ packet = new ActiveMQExceptionMessage_V2();
+ }
break;
}
case PACKETS_CONFIRMED: {
packet = new PacketsConfirmedMessage();
break;
}
+ case NULL_RESPONSE: {
+ if (connection.isVersionBeforeAsyncResponseChange()) {
+ packet = new NullResponseMessage();
+ } else {
+ packet = new NullResponseMessage_V2();
+ }
+ break;
+ }
case CREATESESSION: {
packet = new CreateSessionMessage();
break;
@@ -316,7 +332,11 @@ public abstract class PacketDecoder implements Serializable {
break;
}
case SESS_XA_RESP: {
- packet = new SessionXAResponseMessage();
+ if (connection.isVersionBeforeAsyncResponseChange()) {
+ packet = new SessionXAResponseMessage();
+ } else {
+ packet = new SessionXAResponseMessage_V2();
+ }
break;
}
case SESS_XA_ROLLBACK: {
@@ -383,16 +403,16 @@ public abstract class PacketDecoder implements Serializable {
packet = new SessionIndividualAcknowledgeMessage();
break;
}
- case NULL_RESPONSE: {
- packet = new NullResponseMessage();
- break;
- }
case SESS_RECEIVE_CONTINUATION: {
packet = new SessionReceiveContinuationMessage();
break;
}
case SESS_SEND_CONTINUATION: {
- packet = new SessionSendContinuationMessage();
+ if (connection.isVersionBeforeAsyncResponseChange()) {
+ packet = new SessionSendContinuationMessage();
+ } else {
+ packet = new SessionSendContinuationMessage_V2();
+ }
break;
}
case SESS_PRODUCER_REQUEST_CREDITS: {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 87ba0c3..470e3ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -32,6 +32,7 @@ public class PacketImpl implements Packet {
// 2.0.0
public static final int ADDRESSING_CHANGE_VERSION = 129;
public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
+ public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@@ -272,6 +273,7 @@ public class PacketImpl implements Packet {
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type) {
@@ -439,5 +441,24 @@ public class PacketImpl implements Packet {
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
}
+ @Override
+ public boolean isRequiresResponse() {
+ return false;
+ }
+
+ @Override
+ public boolean isResponseAsync() {
+ return false;
+ }
+
+ @Override
+ public long getCorrelationID() {
+ return -1;
+ }
+
+ @Override
+ public void setCorrelationID(long correlationID) {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
new file mode 100644
index 0000000..f9e8538
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
+
+public class ResponseCache {
+
+ private final AtomicLong sequence = new AtomicLong(0);
+
+ private final ConcurrentLongHashMap<Packet> store;
+ private ResponseHandler responseHandler;
+
+ public ResponseCache() {
+ this.store = new ConcurrentLongHashMap<>();
+ }
+
+ public long nextCorrelationID() {
+ return sequence.incrementAndGet();
+ }
+
+ public boolean add(Packet packet) {
+ this.store.put(packet.getCorrelationID(), packet);
+ return true;
+ }
+
+ public Packet remove(long correlationID) {
+ return store.remove(correlationID);
+ }
+
+ public void handleResponse(Packet response) {
+ long correlationID = response.getCorrelationID();
+ Packet packet = remove(correlationID);
+ if (packet != null) {
+ responseHandler.responseHandler(packet, response);
+ }
+ }
+
+ public void errorAll(ActiveMQException exception) {
+ ConcurrentLongHashSet keys = store.keysLongHashSet();
+ keys.forEach(correlationID -> {
+ handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception));
+ });
+ }
+
+ public void setResponseHandler(ResponseHandler responseHandler) {
+ this.responseHandler = responseHandler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
index da34d2e..51637f3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class ActiveMQExceptionMessage extends PacketImpl {
- private ActiveMQException exception;
+ protected ActiveMQException exception;
// Static --------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
new file mode 100644
index 0000000..661a040
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage {
+
+ private long correlationID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) {
+ super(exception);
+ this.correlationID = correlationID;
+ }
+
+ public ActiveMQExceptionMessage_V2() {
+ super();
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public boolean isResponse() {
+ return true;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeLong(correlationID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+ correlationID = buffer.readLong();
+ }
+ }
+
+ @Override
+ public final boolean isResponseAsync() {
+ return true;
+ }
+
+ @Override
+ public long getCorrelationID() {
+ return this.correlationID;
+ }
+
+ @Override
+ public String toString() {
+ return getParentString() + ", exception= " + exception + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!super.equals(obj)) {
+ return false;
+ }
+ if (!(obj instanceof ActiveMQExceptionMessage_V2)) {
+ return false;
+ }
+ ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj;
+ if (correlationID != other.correlationID) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
index a98f888..8c84a9b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
@@ -65,6 +65,7 @@ public class CreateAddressMessage extends PacketImpl {
return address;
}
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
index 2ebf147..985d5f4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
@@ -100,6 +100,7 @@ public class CreateQueueMessage extends PacketImpl {
return temporary;
}
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
index af25ae9..3c072e0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
@@ -80,6 +80,7 @@ public class CreateSharedQueueMessage extends PacketImpl {
return filterString;
}
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
new file mode 100644
index 0000000..e3453af
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class NullResponseMessage_V2 extends NullResponseMessage {
+
+ private long correlationID;
+
+ public NullResponseMessage_V2(final long correlationID) {
+ super();
+ this.correlationID = correlationID;
+ }
+
+ public NullResponseMessage_V2() {
+ super();
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public long getCorrelationID() {
+ return correlationID;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeLong(correlationID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+ correlationID = buffer.readLong();
+ }
+ }
+
+ @Override
+ public final boolean isResponse() {
+ return true;
+ }
+
+ @Override
+ public final boolean isResponseAsync() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return getParentString() + ", correlationID=" + correlationID + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!super.equals(obj)) {
+ return false;
+ }
+ if (!(obj instanceof NullResponseMessage_V2)) {
+ return false;
+ }
+ NullResponseMessage_V2 other = (NullResponseMessage_V2) obj;
+ if (correlationID != other.correlationID) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
index 542c34c..67d9f67 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
@@ -51,6 +51,7 @@ public class SessionAcknowledgeMessage extends PacketImpl {
return messageID;
}
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
index f09beeb..e07b50c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
@@ -71,6 +71,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
return browseOnly;
}
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
index 7d06081..3164c23 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
@@ -60,6 +60,7 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl {
return messageID;
}
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index 26eedd7..4105b11 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public class SessionSendContinuationMessage extends SessionContinuationMessage {
- private boolean requiresResponse;
+ protected boolean requiresResponse;
// Used on confirmation handling
- private Message message;
+ protected Message message;
/**
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
* <br>
@@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
/**
* to be sent on the last package
*/
- private long messageBodySize = -1;
+ protected long messageBodySize = -1;
// Static --------------------------------------------------------
@@ -54,6 +54,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
handler = null;
}
+ protected SessionSendContinuationMessage(byte type) {
+ super(type);
+ handler = null;
+ }
+
/**
* @param body
* @param continues
@@ -72,11 +77,31 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
this.messageBodySize = messageBodySize;
}
+ /**
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ protected SessionSendContinuationMessage(final byte type,
+ final Message message,
+ final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse,
+ final long messageBodySize,
+ SendAcknowledgementHandler handler) {
+ super(type, body, continues);
+ this.requiresResponse = requiresResponse;
+ this.message = message;
+ this.handler = handler;
+ this.messageBodySize = messageBodySize;
+ }
+
// Public --------------------------------------------------------
/**
* @return the requiresResponse
*/
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
new file mode 100644
index 0000000..2a3071c
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+/**
+ * A SessionSendContinuationMessage<br>
+ */
+public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage {
+
+ private long correlationID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionSendContinuationMessage_V2() {
+ super();
+ }
+
+ /**
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ public SessionSendContinuationMessage_V2(final Message message,
+ final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse,
+ final long messageBodySize,
+ SendAcknowledgementHandler handler) {
+ super(message, body, continues, requiresResponse, messageBodySize, handler);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int expectedEncodeSize() {
+ return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeLong(correlationID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+ correlationID = buffer.readLong();
+ }
+ }
+
+ @Override
+ public long getCorrelationID() {
+ return this.correlationID;
+ }
+
+ @Override
+ public void setCorrelationID(long correlationID) {
+ this.correlationID = correlationID;
+ }
+
+ @Override
+ public boolean isResponseAsync() {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append(", continues=" + continues);
+ buff.append(", message=" + message);
+ buff.append(", messageBodySize=" + messageBodySize);
+ buff.append(", requiresResponse=" + requiresResponse);
+ buff.append(", correlationID=" + correlationID);
+ buff.append("]");
+ return buff.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!super.equals(obj))
+ return false;
+ if (!(obj instanceof SessionSendContinuationMessage_V2))
+ return false;
+ SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj;
+ if (correlationID != other.correlationID)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index b56ae30..e8dbdc1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
public class SessionSendMessage extends MessagePacket {
@@ -37,6 +38,22 @@ public class SessionSendMessage extends MessagePacket {
private final transient SendAcknowledgementHandler handler;
/** This will be using the CoreMessage because it is meant for the core-protocol */
+ protected SessionSendMessage(final byte id,
+ final ICoreMessage message,
+ final boolean requiresResponse,
+ final SendAcknowledgementHandler handler) {
+ super(id, message);
+ this.handler = handler;
+ this.requiresResponse = requiresResponse;
+ }
+
+ protected SessionSendMessage(final byte id,
+ final CoreMessage message) {
+ super(id, message);
+ this.handler = null;
+ }
+
+ /** This will be using the CoreMessage because it is meant for the core-protocol */
public SessionSendMessage(final ICoreMessage message,
final boolean requiresResponse,
final SendAcknowledgementHandler handler) {
@@ -52,6 +69,7 @@ public class SessionSendMessage extends MessagePacket {
// Public --------------------------------------------------------
+ @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
@@ -62,7 +80,7 @@ public class SessionSendMessage extends MessagePacket {
@Override
public int expectedEncodeSize() {
- return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1;
+ return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize();
}
@Override
@@ -75,13 +93,16 @@ public class SessionSendMessage extends MessagePacket {
public void decodeRest(final ActiveMQBuffer buffer) {
// Buffer comes in after having read standard headers and positioned at Beginning of body part
- ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
+ ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize());
receiveMessage(messageBuffer);
- buffer.readerIndex(buffer.capacity() - 1);
+ buffer.readerIndex(buffer.capacity() - fieldsEncodeSize());
requiresResponse = buffer.readBoolean();
+ }
+ protected int fieldsEncodeSize() {
+ return DataConstants.SIZE_BOOLEAN;
}
protected void receiveMessage(ByteBuf messageBuffer) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
new file mode 100644
index 0000000..63c9a34
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class SessionSendMessage_V2 extends SessionSendMessage {
+
+ private long correlationID;
+
+ /** This will be using the CoreMessage because it is meant for the core-protocol */
+ public SessionSendMessage_V2(final ICoreMessage message,
+ final boolean requiresResponse,
+ final SendAcknowledgementHandler handler) {
+ super(SESS_SEND, message, requiresResponse, handler);
+ }
+
+ public SessionSendMessage_V2(final CoreMessage message) {
+ super(SESS_SEND, message);
+ }
+
+ @Override
+ public void encodeRest(ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeLong(correlationID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ correlationID = buffer.readLong();
+ }
+
+ @Override
+ protected int fieldsEncodeSize() {
+ return super.fieldsEncodeSize() + DataConstants.SIZE_LONG;
+ }
+
+ @Override
+ public long getCorrelationID() {
+ return this.correlationID;
+ }
+
+ @Override
+ public void setCorrelationID(long correlationID) {
+ this.correlationID = correlationID;
+ }
+
+ @Override
+ public boolean isResponseAsync() {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+ return result;
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append(", correlationID=" + correlationID);
+ buff.append(", requiresResponse=" + super.isRequiresResponse());
+ buff.append("]");
+ return buff.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!super.equals(obj))
+ return false;
+ if (!(obj instanceof SessionSendMessage_V2))
+ return false;
+ SessionSendMessage_V2 other = (SessionSendMessage_V2) obj;
+ if (correlationID != other.correlationID)
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
index 086b851..f88e0c8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
@@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionXAResponseMessage extends PacketImpl {
- private boolean error;
+ protected boolean error;
- private int responseCode;
+ protected int responseCode;
- private String message;
+ protected String message;
public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) {
super(SESS_XA_RESP);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
new file mode 100644
index 0000000..4e949bd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage {
+
+ private long correlationID;
+
+ public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) {
+ super(isError, responseCode, message);
+ this.correlationID = correlationID;
+ }
+
+ public SessionXAResponseMessage_V2() {
+ super();
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public long getCorrelationID() {
+ return correlationID;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeLong(correlationID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+ correlationID = buffer.readLong();
+ }
+ }
+
+ @Override
+ public final boolean isResponse() {
+ return true;
+ }
+
+ @Override
+ public final boolean isResponseAsync() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append(", error=" + error);
+ buff.append(", message=" + message);
+ buff.append(", responseCode=" + responseCode);
+ buff.append(", correlationID=" + correlationID);
+ buff.append("]");
+ return buff.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!super.equals(obj)) {
+ return false;
+ }
+ if (!(obj instanceof SessionXAResponseMessage_V2)) {
+ return false;
+ }
+ SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj;
+ if (correlationID != other.correlationID) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-core-client/src/main/resources/activemq-version.properties
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties
index a39b422..ff65ff9 100644
--- a/artemis-core-client/src/main/resources/activemq-version.properties
+++ b/artemis-core-client/src/main/resources/activemq-version.properties
@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
-activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129
+activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index ae1d270..fc15d5e 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -601,6 +601,36 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
}
@Override
+ public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
+ if (jmsMessage instanceof StreamMessage) {
+ try {
+ ((StreamMessage) jmsMessage).reset();
+ } catch (JMSException e) {
+ // HORNETQ-1209 XXX ignore?
+ }
+ }
+ if (jmsMessage instanceof BytesMessage) {
+ try {
+ ((BytesMessage) jmsMessage).reset();
+ } catch (JMSException e) {
+ // HORNETQ-1209 XXX ignore?
+ }
+ }
+
+ try {
+ producer.connection.getThreadAwareContext().setCurrentThread(true);
+ if (exception instanceof ActiveMQException) {
+ exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception);
+ } else if (exception instanceof ActiveMQInterruptedException) {
+ exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+ }
+ completionListener.onException(jmsMessage, exception);
+ } finally {
+ producer.connection.getThreadAwareContext().clearCurrentThread(true);
+ }
+ }
+
+ @Override
public String toString() {
return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index d38f45f..0428abe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -90,8 +91,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
if (connection.isVersionBeforeAddressChange()) {
sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
- } else {
+ } else if (connection.isVersionBeforeAsyncResponseChange()) {
sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
+ } else {
+ sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools));
}
sendMessage.decode(in);
[3/4] activemq-artemis git commit: ARTEMIS-1545 Support JMS 2.0
Completion Listener for Exceptions
Posted by cl...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 37564b5..f5756f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
@@ -76,11 +77,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
@@ -313,7 +316,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse();
sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
break;
}
@@ -342,7 +345,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = request.isRequiresResponse();
session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated());
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
break;
}
@@ -351,7 +354,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable());
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
break;
}
@@ -361,7 +364,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
request.isExclusive(), request.isLastValue(), request.isAutoCreated());
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
break;
}
@@ -373,7 +376,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
}
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
break;
}
@@ -385,7 +388,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
}
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
break;
}
@@ -393,7 +396,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true;
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
session.deleteQueue(request.getQueueName());
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
break;
}
case SESS_QUEUEQUERY: {
@@ -453,62 +456,62 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_COMMIT: {
requiresResponse = true;
session.commit();
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
break;
}
case SESS_ROLLBACK: {
requiresResponse = true;
session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
break;
}
case SESS_XA_COMMIT: {
requiresResponse = true;
SessionXACommitMessage message = (SessionXACommitMessage) packet;
session.xaCommit(message.getXid(), message.isOnePhase());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_END: {
requiresResponse = true;
SessionXAEndMessage message = (SessionXAEndMessage) packet;
session.xaEnd(message.getXid());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_FORGET: {
requiresResponse = true;
SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
session.xaForget(message.getXid());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_JOIN: {
requiresResponse = true;
SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
session.xaJoin(message.getXid());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_RESUME: {
requiresResponse = true;
SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
session.xaResume(message.getXid());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_ROLLBACK: {
requiresResponse = true;
SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
session.xaRollback(message.getXid());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_START: {
requiresResponse = true;
SessionXAStartMessage message = (SessionXAStartMessage) packet;
session.xaStart(message.getXid());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_FAILED: {
@@ -521,14 +524,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_XA_SUSPEND: {
requiresResponse = true;
session.xaSuspend();
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_PREPARE: {
requiresResponse = true;
SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
session.xaPrepare(message.getXid());
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_INDOUBT_XIDS: {
@@ -557,14 +560,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_STOP: {
requiresResponse = true;
session.stop();
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
break;
}
case SESS_CLOSE: {
requiresResponse = true;
session.close(false);
// removeConnectionListeners();
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
flush = true;
closeChannel = true;
break;
@@ -574,7 +577,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse();
session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
break;
}
@@ -582,7 +585,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true;
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
session.closeConsumer(message.getConsumerID());
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
break;
}
case SESS_FORCE_CONSUMER_DELIVERY: {
@@ -591,7 +594,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
break;
}
case PacketImpl.SESS_ADD_METADATA: {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
session.addMetaData(message.getKey(), message.getData());
break;
@@ -600,7 +603,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true;
SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
if (message.isRequiresConfirmations()) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
session.addMetaData(message.getKey(), message.getData());
break;
@@ -609,7 +612,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true;
SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
if (session.addUniqueMetaData(message.getKey(), message.getData())) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
} else {
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
}
@@ -617,15 +620,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
} catch (ActiveMQIOErrorException e) {
- response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
- response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
- response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
- response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
- response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, flush, closeChannel);
} finally {
@@ -633,6 +636,26 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
+ private Packet createNullResponseMessage(Packet packet) {
+ final Packet response;
+ if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {
+ response = new NullResponseMessage();
+ } else {
+ response = new NullResponseMessage_V2(packet.getCorrelationID());
+ }
+ return response;
+ }
+
+ private Packet createSessionXAResponseMessage(Packet packet) {
+ Packet response;
+ if (packet.isResponseAsync()) {
+ response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null);
+ } else {
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ return response;
+ }
+
private void onSessionAcknowledge(Packet packet) {
this.storageManager.setContext(session.getSessionContext());
try {
@@ -643,18 +666,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse();
this.session.acknowledge(message.getConsumerID(), message.getMessageID());
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
} catch (ActiveMQIOErrorException e) {
- response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
- response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
- response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
- response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
- response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
@@ -672,18 +695,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse();
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
if (requiresResponse) {
- response = new NullResponseMessage();
+ response = createNullResponseMessage(packet);
}
} catch (ActiveMQIOErrorException e) {
- response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
- response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
- response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
- response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
- response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
@@ -700,15 +723,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
session.requestProducerCredits(message.getAddress(), message.getCredits());
} catch (ActiveMQIOErrorException e) {
- response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
- response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
- response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
- response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
- response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
@@ -725,15 +748,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
} catch (ActiveMQIOErrorException e) {
- response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
- response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
- response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
- response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
- response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
@@ -742,50 +765,68 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
- private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e,
+ private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet,
+ ActiveMQIOErrorException e,
boolean requiresResponse,
Packet response,
ServerSession session) {
session.markTXFailed(e);
if (requiresResponse) {
logger.debug("Sending exception to client", e);
- response = new ActiveMQExceptionMessage(e);
+ response = convertToExceptionPacket(packet, e);
} else {
ActiveMQServerLogger.LOGGER.caughtException(e);
}
return response;
}
- private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e,
+ private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet,
+ ActiveMQXAException e,
boolean requiresResponse,
Packet response) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
- response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+ if (packet.isResponseAsync()) {
+ response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage());
+ } else {
+ response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+ }
} else {
ActiveMQServerLogger.LOGGER.caughtXaException(e);
}
return response;
}
- private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e,
+ private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet,
+ ActiveMQQueueMaxConsumerLimitReached e,
boolean requiresResponse,
Packet response) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
- response = new ActiveMQExceptionMessage(e);
+ response = convertToExceptionPacket(packet, e);
} else {
ActiveMQServerLogger.LOGGER.caughtException(e);
}
return response;
}
- private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e,
+ private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) {
+ Packet response;
+ if (packet.isResponseAsync()) {
+ response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e);
+ } else {
+ response = new ActiveMQExceptionMessage(e);
+ }
+ return response;
+ }
+
+ private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet,
+ ActiveMQException e,
boolean requiresResponse,
Packet response) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
- response = new ActiveMQExceptionMessage(e);
+ response = convertToExceptionPacket(packet, e);
} else {
if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
logger.debug("Caught exception", e);
@@ -796,7 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
return response;
}
- private static Packet onCatchThrowableWhileHandlePacket(Throwable t,
+ private static Packet onCatchThrowableWhileHandlePacket(Packet packet,
+ Throwable t,
boolean requiresResponse,
Packet response,
ServerSession session) {
@@ -805,7 +847,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
activeMQInternalErrorException.initCause(t);
- response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
+ response = convertToExceptionPacket(packet, activeMQInternalErrorException);
} else {
ActiveMQServerLogger.LOGGER.caughtException(t);
}
@@ -827,12 +869,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
- ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage));
-
- doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+ Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage));
+ doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel);
if (logger.isTraceEnabled()) {
- logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage);
+ logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 36e5c33..7acfe0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,7 +126,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
- <activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
+ <activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index e4afb5b..c7ed869 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
@@ -315,6 +316,11 @@ public class BackupSyncDelay implements Interceptor {
}
@Override
+ public void setResponseHandler(ResponseHandler handler) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void flushConfirmations() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
index d3951f2..3020310 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
@@ -167,7 +167,24 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
@Override
public void onException(Message message, Exception exception) {
- // TODO Auto-generated method stub
+ latch.countDown();
+ try {
+ switch (call) {
+ case 0:
+ context.rollback();
+ break;
+ case 1:
+ context.commit();
+ break;
+ case 2:
+ context.close();
+ break;
+ default:
+ throw new IllegalArgumentException("call code " + call);
+ }
+ } catch (Exception error1) {
+ this.error = error1;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
index e83d815..0fd469c 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
@@ -16,12 +16,23 @@
*/
package org.apache.activemq.artemis.jms.tests;
+import static org.junit.Assert.fail;
+
+import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@@ -68,9 +79,9 @@ public class SecurityTest extends JMSTestCase {
}
- /**
- * Login with no user, no password Should allow login (equivalent to guest)
- */
+ /**
+ * Login with no user, no password Should allow login (equivalent to guest)
+ */
@Test
public void testLoginNoUserNoPassword() throws Exception {
createConnection();
@@ -170,6 +181,71 @@ public class SecurityTest extends JMSTestCase {
}
}
+ /**
+ * Login with valid user and password
+ * But try send to address not authorised - Persistent
+ * Should not allow and should throw exception
+ */
+ @Test
+ public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ Connection connection = connectionFactory.createConnection("guest", "guest");
+ Session session = connection.createSession();
+ Destination destination = session.createQueue("guest.cannot.send");
+ MessageProducer messageProducer = session.createProducer(destination);
+ try {
+ messageProducer.send(session.createTextMessage("hello"));
+ fail("JMSSecurityException expected as guest is not allowed to send");
+ } catch (JMSSecurityException activeMQSecurityException) {
+ //pass
+ }
+ connection.close();
+ }
+
+ /**
+ * Login with valid user and password
+ * But try send to address not authorised - Non Persistent.
+ * Should have same behaviour as Persistent with exception on send.
+ */
+ @Test
+ public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connectionFactory.setConfirmationWindowSize(100);
+ connectionFactory.setBlockOnDurableSend(false);
+ connectionFactory.setBlockOnNonDurableSend(false);
+ Connection connection = connectionFactory.createConnection("guest", "guest");
+ Session session = connection.createSession();
+ Destination destination = session.createQueue("guest.cannot.send");
+ MessageProducer messageProducer = session.createProducer(destination);
+ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ try {
+ AtomicReference<Exception> e = new AtomicReference<>();
+ // messageProducer.send(session.createTextMessage("hello"));
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
+ @Override
+ public void onCompletion(Message message) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ e.set(exception);
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ if (e.get() != null) {
+ throw e.get();
+ }
+ fail("JMSSecurityException expected as guest is not allowed to send");
+ } catch (JMSSecurityException activeMQSecurityException) {
+ activeMQSecurityException.printStackTrace();
+ }
+ connection.close();
+ }
+
/* Now some client id tests */
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e4ba48a3/tests/jms-tests/src/test/resources/broker.xml
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/resources/broker.xml b/tests/jms-tests/src/test/resources/broker.xml
index 733e8c3..644ce83 100644
--- a/tests/jms-tests/src/test/resources/broker.xml
+++ b/tests/jms-tests/src/test/resources/broker.xml
@@ -54,6 +54,16 @@
<permission type="browse" roles="guest,def"/>
<permission type="send" roles="guest,def"/>
</security-setting>
+
+ <security-setting match="guest.cannot.send">
+ <permission type="createDurableQueue" roles="guest,def"/>
+ <permission type="deleteDurableQueue" roles="guest,def"/>
+ <permission type="createNonDurableQueue" roles="guest,def"/>
+ <permission type="deleteNonDurableQueue" roles="guest,def"/>
+ <permission type="consume" roles="guest,def"/>
+ <permission type="browse" roles="guest,def"/>
+ <permission type="send" roles="def"/>
+ </security-setting>
</security-settings>
</core>
</configuration>
\ No newline at end of file
[2/4] activemq-artemis git commit: ARTEMIS-1545 refactor & rework a
few incompatible pieces
Posted by cl...@apache.org.
ARTEMIS-1545 refactor & rework a few incompatible pieces
Existing commit for ARTEMIS-1545 broke bridges and large messages. This
commit fixes those, and refactors the solution a bit to be more clear.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a28b4fb3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a28b4fb3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a28b4fb3
Branch: refs/heads/master
Commit: a28b4fb34eb3cc178dd611d0cb2acc51d6b7a965
Parents: e4ba48a
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Jul 17 10:53:21 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 27 17:29:18 2018 -0400
----------------------------------------------------------------------
.../core/client/SendAcknowledgementHandler.java | 8 +-
.../core/protocol/core/ResponseHandler.java | 6 +-
.../core/impl/ActiveMQSessionContext.java | 37 +-
.../core/protocol/core/impl/ChannelImpl.java | 15 +-
.../core/protocol/core/impl/PacketImpl.java | 5 +-
.../core/protocol/core/impl/ResponseCache.java | 6 +-
.../protocol/core/impl/ChannelImplTest.java | 512 +++++++++++++++++++
.../jms/client/ActiveMQMessageProducer.java | 92 ++--
.../core/ServerSessionPacketHandler.java | 3 +-
.../artemis/jms/tests/SecurityTest.java | 113 +++-
10 files changed, 722 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
index 0f47536..ad45a5f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
@@ -43,9 +43,11 @@ public interface SendAcknowledgementHandler {
void sendAcknowledged(Message message);
default void sendFailed(Message message, Exception e) {
- //This is to keep old behaviour that would ack even if error,
- // if anyone custom implemented this interface but doesnt update.
- sendAcknowledged(message);
+ /**
+ * By default ignore failures to preserve compatibility with existing implementations.
+ * If the message makes it to the broker and a failure occurs sendAcknowledge() will
+ * still be invoked just like it always was.
+ */
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
index 21e9879..f96ef13 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
@@ -17,14 +17,14 @@
package org.apache.activemq.artemis.core.protocol.core;
/**
- * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
+ * A ResponseHandler is used by the channel to handle async responses.
*/
public interface ResponseHandler {
/**
- * called by channel after a confirmation has been received.
+ * called by channel after an async response has been received.
*
* @param packet the packet confirmed
*/
- void responseHandler(Packet packet, Packet response);
+ void handleResponse(Packet packet, Packet response);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 3c0647f..7306072 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -171,11 +171,7 @@ public class ActiveMQSessionContext extends SessionContext {
sessionChannel.setHandler(handler);
if (confirmationWindow >= 0) {
- if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
- sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
- } else {
- sessionChannel.setResponseHandler(responseHandler);
- }
+ setHandlers();
}
}
@@ -192,16 +188,24 @@ public class ActiveMQSessionContext extends SessionContext {
this.killed = true;
}
+ private void setHandlers() {
+ sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
+
+ if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+ sessionChannel.setResponseHandler(responseHandler);
+ }
+ }
+
private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
@Override
public void commandConfirmed(Packet packet) {
- responseHandler.responseHandler(packet, null);
+ responseHandler.handleResponse(packet, null);
}
};
private final ResponseHandler responseHandler = new ResponseHandler() {
@Override
- public void responseHandler(Packet packet, Packet response) {
+ public void handleResponse(Packet packet, Packet response) {
final ActiveMQException activeMQException;
if (response != null && response.getType() == PacketImpl.EXCEPTION) {
ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
@@ -232,7 +236,7 @@ public class ActiveMQSessionContext extends SessionContext {
if (exception == null) {
sendAckHandler.sendAcknowledged(message);
} else {
- handler.sendFailed(message, exception);
+ sendAckHandler.sendFailed(message, exception);
}
}
}
@@ -272,11 +276,8 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
- if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
- sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
- } else {
- sessionChannel.setResponseHandler(responseHandler);
- }
+ setHandlers();
+
this.sendAckHandler = handler;
}
@@ -946,12 +947,12 @@ public class ActiveMQSessionContext extends SessionContext {
boolean lastChunk,
byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
- final boolean requiresResponse = lastChunk || confirmationWindow != -1;
+ final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket;
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
} else {
- chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+ chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
}
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops
@@ -969,11 +970,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
- if (sendBlocking) {
- channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
- } else {
- channel.send(chunkPacket);
- }
+ channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
} else {
channel.send(chunkPacket);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 9cb2a83..61268d6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -253,6 +253,10 @@ public final class ChannelImpl implements Channel {
this.transferring = transferring;
}
+ protected ResponseCache getCache() {
+ return responseAsyncCache;
+ }
+
/**
* @param timeoutMsg message to log on blocking call failover timeout
*/
@@ -316,7 +320,7 @@ public final class ChannelImpl implements Channel {
checkReconnectID(reconnectID);
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
- //As the send could block if the response cache is cannot add, preventing responses to be handled.
+ //As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
@@ -426,7 +430,7 @@ public final class ChannelImpl implements Channel {
throw new ActiveMQInterruptedException(e);
}
- if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) {
+ if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
}
@@ -642,7 +646,7 @@ public final class ChannelImpl implements Channel {
}
}
- public void handleResponse(Packet packet) {
+ public void handleAsyncResponse(Packet packet) {
if (responseAsyncCache != null && packet.isResponseAsync()) {
responseAsyncCache.handleResponse(packet);
}
@@ -700,7 +704,7 @@ public final class ChannelImpl implements Channel {
if (packet.isResponse()) {
confirm(packet);
- handleResponse(packet);
+ handleAsyncResponse(packet);
lock.lock();
try {
@@ -752,6 +756,9 @@ public final class ChannelImpl implements Channel {
if (commandConfirmationHandler != null) {
commandConfirmationHandler.commandConfirmed(packet);
}
+ if (responseAsyncCache != null) {
+ responseAsyncCache.handleResponse(packet);
+ }
}
firstStoredCommandID += numberToClear;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 470e3ae..0168a47 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -31,7 +31,8 @@ public class PacketImpl implements Packet {
// 2.0.0
public static final int ADDRESSING_CHANGE_VERSION = 129;
- public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
+
+ // 2.7.0
public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
@@ -430,7 +431,7 @@ public class PacketImpl implements Packet {
}
protected String getParentString() {
- return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
+ return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName();
}
private int stringEncodeSize(final String str) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
index f9e8538..8ee73d7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
@@ -53,7 +53,7 @@ public class ResponseCache {
long correlationID = response.getCorrelationID();
Packet packet = remove(correlationID);
if (packet != null) {
- responseHandler.responseHandler(packet, response);
+ responseHandler.handleResponse(packet, response);
}
}
@@ -67,4 +67,8 @@ public class ResponseCache {
public void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}
+
+ public int size() {
+ return this.store.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
new file mode 100644
index 0000000..416c911
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl;
+
+import javax.security.auth.Subject;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ChannelImplTest {
+
+ ChannelImpl channel;
+
+ @Before
+ public void setUp() {
+ channel = new ChannelImpl(new CoreRR(), 1, 4000, null);
+ }
+
+ @Test
+ public void testCorrelation() {
+
+ AtomicInteger handleResponseCount = new AtomicInteger();
+
+ RequestPacket requestPacket = new RequestPacket((byte) 1);
+ setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
+
+ channel.send(requestPacket);
+
+ assertEquals(1, channel.getCache().size());
+
+ ResponsePacket responsePacket = new ResponsePacket((byte) 1);
+ responsePacket.setCorrelationID(requestPacket.getCorrelationID());
+
+ channel.handlePacket(responsePacket);
+
+ assertEquals(1, handleResponseCount.get());
+ assertEquals(0, channel.getCache().size());
+ }
+
+ private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
+ channel.setResponseHandler(responseHandler);
+ channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler));
+ }
+
+ private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
+ return new CommandConfirmationHandler() {
+ @Override
+ public void commandConfirmed(Packet packet) {
+ responseHandler.handleResponse(packet, null);
+ }
+ };
+ }
+
+ @Test
+ public void testPacketsConfirmedMessage() {
+
+ AtomicInteger handleResponseCount = new AtomicInteger();
+
+ RequestPacket requestPacket = new RequestPacket((byte) 1);
+ setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
+
+ channel.send(requestPacket);
+
+ PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2);
+
+ channel.handlePacket(responsePacket);
+
+ assertEquals(0, channel.getCache().size());
+ }
+
+ class RequestPacket extends PacketImpl {
+
+ private long id;
+
+ RequestPacket(byte type) {
+ super(type);
+ }
+
+ @Override
+ public boolean isRequiresResponse() {
+ return true;
+ }
+
+ @Override
+ public boolean isResponseAsync() {
+ return true;
+ }
+
+ @Override
+ public long getCorrelationID() {
+ return id;
+ }
+
+ @Override
+ public void setCorrelationID(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public int getPacketSize() {
+ return 0;
+ }
+ }
+
+ class ResponsePacket extends PacketImpl {
+
+ private long id;
+
+ ResponsePacket(byte type) {
+ super(type);
+ }
+
+ @Override
+ public boolean isResponseAsync() {
+ return true;
+ }
+
+ @Override
+ public boolean isResponse() {
+ return true;
+ }
+
+ @Override
+ public long getCorrelationID() {
+ return id;
+ }
+
+ @Override
+ public void setCorrelationID(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public int getPacketSize() {
+ return 0;
+ }
+ }
+
+ class CoreRR implements CoreRemotingConnection {
+
+ @Override
+ public int getChannelVersion() {
+ return 0;
+ }
+
+ @Override
+ public void setChannelVersion(int clientVersion) {
+
+ }
+
+ @Override
+ public Channel getChannel(long channelID, int confWindowSize) {
+ return null;
+ }
+
+ @Override
+ public void putChannel(long channelID, Channel channel) {
+
+ }
+
+ @Override
+ public boolean removeChannel(long channelID) {
+ return false;
+ }
+
+ @Override
+ public long generateChannelID() {
+ return 0;
+ }
+
+ @Override
+ public void syncIDGeneratorSequence(long id) {
+
+ }
+
+ @Override
+ public long getIDGeneratorSequence() {
+ return 0;
+ }
+
+ @Override
+ public long getBlockingCallTimeout() {
+ return 0;
+ }
+
+ @Override
+ public long getBlockingCallFailoverTimeout() {
+ return 0;
+ }
+
+ @Override
+ public Object getTransferLock() {
+ return null;
+ }
+
+ @Override
+ public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
+ return null;
+ }
+
+ @Override
+ public boolean blockUntilWritable(int size, long timeout) {
+ return false;
+ }
+
+ @Override
+ public Object getID() {
+ return null;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return 0;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public void scheduledFlush() {
+
+ }
+
+ @Override
+ public void addFailureListener(FailureListener listener) {
+
+ }
+
+ @Override
+ public boolean removeFailureListener(FailureListener listener) {
+ return false;
+ }
+
+ @Override
+ public void addCloseListener(CloseListener listener) {
+
+ }
+
+ @Override
+ public boolean removeCloseListener(CloseListener listener) {
+ return false;
+ }
+
+ @Override
+ public List<CloseListener> removeCloseListeners() {
+ return null;
+ }
+
+ @Override
+ public void setCloseListeners(List<CloseListener> listeners) {
+
+ }
+
+ @Override
+ public List<FailureListener> getFailureListeners() {
+ return null;
+ }
+
+ @Override
+ public List<FailureListener> removeFailureListeners() {
+ return null;
+ }
+
+ @Override
+ public void setFailureListeners(List<FailureListener> listeners) {
+
+ }
+
+ @Override
+ public ActiveMQBuffer createTransportBuffer(int size) {
+ return new ChannelBufferWrapper(Unpooled.buffer(size));
+ }
+
+ @Override
+ public void fail(ActiveMQException me) {
+
+ }
+
+ @Override
+ public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public Connection getTransportConnection() {
+ return new Connection() {
+ @Override
+ public ActiveMQBuffer createTransportBuffer(int size) {
+ return null;
+ }
+
+ @Override
+ public RemotingConnection getProtocolConnection() {
+ return null;
+ }
+
+ @Override
+ public void setProtocolConnection(RemotingConnection connection) {
+
+ }
+
+ @Override
+ public boolean isWritable(ReadyListener listener) {
+ return false;
+ }
+
+ @Override
+ public void fireReady(boolean ready) {
+
+ }
+
+ @Override
+ public void setAutoRead(boolean autoRead) {
+
+ }
+
+ @Override
+ public Object getID() {
+ return null;
+ }
+
+ @Override
+ public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
+
+ }
+
+ @Override
+ public void write(ActiveMQBuffer buffer,
+ boolean flush,
+ boolean batched,
+ ChannelFutureListener futureListener) {
+
+ }
+
+ @Override
+ public void write(ActiveMQBuffer buffer) {
+
+ }
+
+ @Override
+ public void forceClose() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public String getLocalAddress() {
+ return null;
+ }
+
+ @Override
+ public void checkFlushBatchBuffer() {
+
+ }
+
+ @Override
+ public TransportConfiguration getConnectorConfig() {
+ return null;
+ }
+
+ @Override
+ public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
+ return null;
+ }
+
+ @Override
+ public boolean isUsingProtocolHandling() {
+ return false;
+ }
+
+ @Override
+ public boolean isSameTarget(TransportConfiguration... configs) {
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public boolean isClient() {
+ return true;
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return false;
+ }
+
+ @Override
+ public void disconnect(boolean criticalError) {
+
+ }
+
+ @Override
+ public void disconnect(String scaleDownNodeID, boolean criticalError) {
+
+ }
+
+ @Override
+ public boolean checkDataReceived() {
+ return false;
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public boolean isWritable(ReadyListener callback) {
+ return false;
+ }
+
+ @Override
+ public void killMessage(SimpleString nodeID) {
+
+ }
+
+ @Override
+ public boolean isSupportReconnect() {
+ return false;
+ }
+
+ @Override
+ public boolean isSupportsFlowControl() {
+ return false;
+ }
+
+ @Override
+ public Subject getSubject() {
+ return null;
+ }
+
+ @Override
+ public String getProtocolName() {
+ return null;
+ }
+
+ @Override
+ public void setClientID(String cID) {
+
+ }
+
+ @Override
+ public String getClientID() {
+ return null;
+ }
+
+ @Override
+ public String getTransportLocalAddress() {
+ return null;
+ }
+
+ @Override
+ public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index fc15d5e..ee4223c 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -34,6 +34,8 @@ import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@@ -564,6 +566,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
private final ActiveMQMessageProducer producer;
/**
+ * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent
+ * packet confirmations on the same connection. Using this boolean avoids that possibility.
+ * A new CompletionListenerWrapper is created for each message sent so once it's called once
+ * it will never be called again.
+ */
+ private AtomicBoolean active = new AtomicBoolean(true);
+
+ /**
* @param jmsMessage
* @param producer
*/
@@ -577,56 +587,62 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
@Override
public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
- if (jmsMessage instanceof StreamMessage) {
- try {
- ((StreamMessage) jmsMessage).reset();
- } catch (JMSException e) {
- // HORNETQ-1209 XXX ignore?
+ if (active.get()) {
+ if (jmsMessage instanceof StreamMessage) {
+ try {
+ ((StreamMessage) jmsMessage).reset();
+ } catch (JMSException e) {
+ // HORNETQ-1209 XXX ignore?
+ }
}
- }
- if (jmsMessage instanceof BytesMessage) {
- try {
- ((BytesMessage) jmsMessage).reset();
- } catch (JMSException e) {
- // HORNETQ-1209 XXX ignore?
+ if (jmsMessage instanceof BytesMessage) {
+ try {
+ ((BytesMessage) jmsMessage).reset();
+ } catch (JMSException e) {
+ // HORNETQ-1209 XXX ignore?
+ }
}
- }
- try {
- producer.connection.getThreadAwareContext().setCurrentThread(true);
- completionListener.onCompletion(jmsMessage);
- } finally {
- producer.connection.getThreadAwareContext().clearCurrentThread(true);
+ try {
+ producer.connection.getThreadAwareContext().setCurrentThread(true);
+ completionListener.onCompletion(jmsMessage);
+ } finally {
+ producer.connection.getThreadAwareContext().clearCurrentThread(true);
+ active.set(false);
+ }
}
}
@Override
public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
- if (jmsMessage instanceof StreamMessage) {
- try {
- ((StreamMessage) jmsMessage).reset();
- } catch (JMSException e) {
- // HORNETQ-1209 XXX ignore?
+ if (active.get()) {
+ if (jmsMessage instanceof StreamMessage) {
+ try {
+ ((StreamMessage) jmsMessage).reset();
+ } catch (JMSException e) {
+ // HORNETQ-1209 XXX ignore?
+ }
}
- }
- if (jmsMessage instanceof BytesMessage) {
- try {
- ((BytesMessage) jmsMessage).reset();
- } catch (JMSException e) {
- // HORNETQ-1209 XXX ignore?
+ if (jmsMessage instanceof BytesMessage) {
+ try {
+ ((BytesMessage) jmsMessage).reset();
+ } catch (JMSException e) {
+ // HORNETQ-1209 XXX ignore?
+ }
}
- }
- try {
- producer.connection.getThreadAwareContext().setCurrentThread(true);
- if (exception instanceof ActiveMQException) {
- exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception);
- } else if (exception instanceof ActiveMQInterruptedException) {
- exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+ try {
+ producer.connection.getThreadAwareContext().setCurrentThread(true);
+ if (exception instanceof ActiveMQException) {
+ exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
+ } else if (exception instanceof ActiveMQInterruptedException) {
+ exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+ }
+ completionListener.onException(jmsMessage, exception);
+ } finally {
+ producer.connection.getThreadAwareContext().clearCurrentThread(true);
+ active.set(false);
}
- completionListener.onException(jmsMessage, exception);
- } finally {
- producer.connection.getThreadAwareContext().clearCurrentThread(true);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index f5756f2..16a87d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -893,7 +893,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final Packet response,
final boolean flush,
final boolean closeChannel) {
- if (confirmPacket != null) {
+ // don't confirm if the response is an exception
+ if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {
channel.confirm(confirmPacket);
if (flush) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a28b4fb3/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
index 0fd469c..3ba58f0 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
@@ -18,12 +18,15 @@ package org.apache.activemq.artemis.jms.tests;
import static org.junit.Assert.fail;
+import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -33,6 +36,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@@ -188,10 +193,14 @@ public class SecurityTest extends JMSTestCase {
*/
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+ if (getJmsServer().locateQueue(queueName) == null) {
+ getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+ }
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession();
- Destination destination = session.createQueue("guest.cannot.send");
+ Destination destination = session.createQueue(queueName.toString());
MessageProducer messageProducer = session.createProducer(destination);
try {
messageProducer.send(session.createTextMessage("hello"));
@@ -209,18 +218,21 @@ public class SecurityTest extends JMSTestCase {
*/
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+ if (getJmsServer().locateQueue(queueName) == null) {
+ getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+ }
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory.setConfirmationWindowSize(100);
connectionFactory.setBlockOnDurableSend(false);
connectionFactory.setBlockOnNonDurableSend(false);
Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession();
- Destination destination = session.createQueue("guest.cannot.send");
+ Destination destination = session.createQueue(queueName.toString());
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
try {
AtomicReference<Exception> e = new AtomicReference<>();
- // messageProducer.send(session.createTextMessage("hello"));
CountDownLatch countDownLatch = new CountDownLatch(1);
messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
@@ -242,6 +254,101 @@ public class SecurityTest extends JMSTestCase {
fail("JMSSecurityException expected as guest is not allowed to send");
} catch (JMSSecurityException activeMQSecurityException) {
activeMQSecurityException.printStackTrace();
+ } finally {
+ connection.close();
+ }
+ }
+
+ /**
+ * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API.
+ */
+ @Test
+ public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+ if (getJmsServer().locateQueue(queueName) == null) {
+ getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+ }
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connectionFactory.setConfirmationWindowSize(100);
+ connectionFactory.setBlockOnDurableSend(false);
+ connectionFactory.setBlockOnNonDurableSend(false);
+ JMSContext context = connectionFactory.createContext("guest", "guest");
+ Destination destination = context.createQueue(queueName.toString());
+ JMSProducer messageProducer = context.createProducer();
+ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ try {
+ AtomicReference<Exception> e = new AtomicReference<>();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ messageProducer.setAsync(new CompletionListener() {
+ @Override
+ public void onCompletion(Message message) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ e.set(exception);
+ countDownLatch.countDown();
+ }
+ });
+ messageProducer.send(destination, context.createTextMessage("hello"));
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ if (e.get() != null) {
+ throw e.get();
+ }
+ fail("JMSSecurityException expected as guest is not allowed to send");
+ } catch (JMSSecurityException activeMQSecurityException) {
+ activeMQSecurityException.printStackTrace();
+ } finally {
+ context.close();
+ }
+ }
+
+ /**
+ * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message.
+ */
+ @Test
+ public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+ if (getJmsServer().locateQueue(queueName) == null) {
+ getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+ }
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connectionFactory.setConfirmationWindowSize(100);
+ connectionFactory.setBlockOnDurableSend(false);
+ connectionFactory.setBlockOnNonDurableSend(false);
+ connectionFactory.setMinLargeMessageSize(1024);
+ Connection connection = connectionFactory.createConnection("guest", "guest");
+ Session session = connection.createSession();
+ Destination destination = session.createQueue(queueName.toString());
+ MessageProducer messageProducer = session.createProducer(destination);
+ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ try {
+ AtomicReference<Exception> e = new AtomicReference<>();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[10 * 1024]);
+ messageProducer.send(message, new CompletionListener() {
+ @Override
+ public void onCompletion(Message message) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ e.set(exception);
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ if (e.get() != null) {
+ throw e.get();
+ }
+ fail("JMSSecurityException expected as guest is not allowed to send");
+ } catch (JMSSecurityException activeMQSecurityException) {
+ activeMQSecurityException.printStackTrace();
}
connection.close();
}