You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/28 13:17:52 UTC
[2/2] activemq-artemis git commit: Reverting JMS Completion listener
on 2.6.x..
Reverting JMS Completion listener on 2.6.x..
This change is too big for maintainance branch. Reverting it.
Revert "ARTEMIS-1545 refactor & rework a few incompatible pieces"
Revert "ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions"
This reverts commit c9d8697a6cf5e4620970da878fc5ab4f8d9d148f.
This reverts commit f4734868a5a07dfc6db533a96f9f8e01de5139c5.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2242d244
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2242d244
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2242d244
Branch: refs/heads/2.6.x
Commit: 2242d2447cdaaf3f7ad59f1b9c342035e9b8426e
Parents: da7fb89
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Sep 28 09:15:16 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 28 09:16:58 2018 -0400
----------------------------------------------------------------------
.../core/client/SendAcknowledgementHandler.java | 9 -
.../client/ActiveMQClientMessageBundle.java | 3 -
.../artemis/core/protocol/core/Channel.java | 3 -
.../protocol/core/CoreRemotingConnection.java | 5 -
.../artemis/core/protocol/core/Packet.java | 8 -
.../core/protocol/core/ResponseHandler.java | 30 --
.../core/impl/ActiveMQSessionContext.java | 71 +--
.../core/protocol/core/impl/ChannelImpl.java | 67 +--
.../core/protocol/core/impl/PacketDecoder.java | 34 +-
.../core/protocol/core/impl/PacketImpl.java | 26 +-
.../core/protocol/core/impl/ResponseCache.java | 74 ---
.../wireformat/ActiveMQExceptionMessage.java | 2 +-
.../wireformat/ActiveMQExceptionMessage_V2.java | 101 ----
.../impl/wireformat/CreateAddressMessage.java | 1 -
.../impl/wireformat/CreateQueueMessage.java | 1 -
.../wireformat/CreateSharedQueueMessage.java | 1 -
.../impl/wireformat/NullResponseMessage_V2.java | 96 ----
.../wireformat/SessionAcknowledgeMessage.java | 1 -
.../SessionCreateConsumerMessage.java | 1 -
.../SessionIndividualAcknowledgeMessage.java | 1 -
.../SessionSendContinuationMessage.java | 31 +-
.../SessionSendContinuationMessage_V2.java | 122 -----
.../impl/wireformat/SessionSendMessage.java | 27 +-
.../impl/wireformat/SessionSendMessage_V2.java | 104 ----
.../wireformat/SessionXAResponseMessage.java | 6 +-
.../wireformat/SessionXAResponseMessage_V2.java | 102 ----
.../main/resources/activemq-version.properties | 2 +-
.../protocol/core/impl/ChannelImplTest.java | 512 -------------------
.../jms/client/ActiveMQMessageProducer.java | 76 +--
.../core/protocol/ServerPacketDecoder.java | 5 +-
.../core/ServerSessionPacketHandler.java | 176 +++----
pom.xml | 2 +-
.../cluster/util/BackupSyncDelay.java | 6 -
.../JmsProducerCompletionListenerTest.java | 19 +-
.../artemis/jms/tests/SecurityTest.java | 189 +------
tests/jms-tests/src/test/resources/broker.xml | 10 -
36 files changed, 126 insertions(+), 1798 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
index ad45a5f..c164f6c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
@@ -41,13 +41,4 @@ public interface SendAcknowledgementHandler {
* @param message message sent asynchronously
*/
void sendAcknowledged(Message message);
-
- default void sendFailed(Message message, Exception e) {
- /**
- * By default ignore failures to preserve compatibility with existing implementations.
- * If the message makes it to the broker and a failure occurs sendAcknowledge() will
- * still be invoked just like it always was.
- */
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
index e043ac9..bb88e6d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
@@ -228,7 +228,4 @@ public interface ActiveMQClientMessageBundle {
@Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
ActiveMQInterruptedException packetTransmissionInterrupted();
-
- @Message(id = 119063, value = "Cannot send a packet while response cache is full.")
- IllegalStateException cannotSendPacketWhilstResponseCacheFull();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 56f8259..127a69a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -211,9 +211,6 @@ public interface Channel {
*/
void setCommandConfirmationHandler(CommandConfirmationHandler handler);
- void setResponseHandler(ResponseHandler handler);
-
-
/**
* flushes any confirmations on to the connection.
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 74d9847..b6a5d93 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -36,11 +36,6 @@ public interface CoreRemotingConnection extends RemotingConnection {
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
}
- default boolean isVersionBeforeAsyncResponseChange() {
- int version = getChannelVersion();
- return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
- }
-
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index b658090..1f40314 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -41,14 +41,6 @@ public interface Packet {
return INITIAL_PACKET_SIZE;
}
- boolean isRequiresResponse();
-
- boolean isResponseAsync();
-
- long getCorrelationID();
-
- void setCorrelationID(long correlationID);
-
/**
* Returns the channel id of the channel that should handle this packet.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
deleted file mode 100644
index f96ef13..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core;
-
-/**
- * A ResponseHandler is used by the channel to handle async responses.
- */
-public interface ResponseHandler {
-
- /**
- * called by channel after an async response has been received.
- *
- * @param packet the packet confirmed
- */
- void handleResponse(Packet packet, Packet response);
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 658bfcf..cbbfcab 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -55,7 +55,6 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -91,11 +90,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -168,7 +165,7 @@ public class ActiveMQSessionContext extends SessionContext {
sessionChannel.setHandler(handler);
if (confirmationWindow >= 0) {
- setHandlers();
+ sessionChannel.setCommandConfirmationHandler(confirmationHandler);
}
}
@@ -185,58 +182,28 @@ public class ActiveMQSessionContext extends SessionContext {
this.killed = true;
}
- private void setHandlers() {
- sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
-
- if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
- sessionChannel.setResponseHandler(responseHandler);
- }
- }
-
- private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
+ private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
@Override
- public void commandConfirmed(Packet packet) {
- responseHandler.handleResponse(packet, null);
- }
- };
-
- private final ResponseHandler responseHandler = new ResponseHandler() {
- @Override
- public void handleResponse(Packet packet, Packet response) {
- final ActiveMQException activeMQException;
- if (response != null && response.getType() == PacketImpl.EXCEPTION) {
- ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
- activeMQException = exceptionResponseMessage.getException();
- } else {
- activeMQException = null;
- }
-
+ public void commandConfirmed(final Packet packet) {
if (packet.getType() == PacketImpl.SESS_SEND) {
SessionSendMessage ssm = (SessionSendMessage) packet;
- callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);
+ callSendAck(ssm.getHandler(), ssm.getMessage());
} else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
if (!scm.isContinues()) {
- callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);
+ callSendAck(scm.getHandler(), scm.getMessage());
}
}
}
- private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {
+ private void callSendAck(SendAcknowledgementHandler handler, final Message message) {
if (handler != null) {
- if (exception == null) {
- handler.sendAcknowledged(message);
- } else {
- handler.sendFailed(message, exception);
- }
+ handler.sendAcknowledged(message);
} else if (sendAckHandler != null) {
- if (exception == null) {
- sendAckHandler.sendAcknowledged(message);
- } else {
- sendAckHandler.sendFailed(message, exception);
- }
+ sendAckHandler.sendAcknowledged(message);
}
}
+
};
// Failover utility methods
@@ -273,8 +240,7 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
- setHandlers();
-
+ sessionChannel.setCommandConfirmationHandler(confirmationHandler);
this.sendAckHandler = handler;
}
@@ -502,15 +468,13 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException {
- final SessionSendMessage packet;
+ SessionSendMessage packet;
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
- } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
- packet = new SessionSendMessage(msgI, sendBlocking, handler);
} else {
- boolean responseRequired = confirmationWindow != -1 || sendBlocking;
- packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
+ packet = new SessionSendMessage(msgI, sendBlocking, handler);
}
+
if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
} else {
@@ -926,7 +890,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
- private int sendSessionSendContinuationMessage(Channel channel,
+ private static int sendSessionSendContinuationMessage(Channel channel,
Message msgI,
long messageBodySize,
boolean sendBlocking,
@@ -934,12 +898,7 @@ public class ActiveMQSessionContext extends SessionContext {
byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
- final SessionSendContinuationMessage chunkPacket;
- if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
- chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
- } else {
- chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
- }
+ final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 61268d6..4d73cf8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -97,8 +96,6 @@ public final class ChannelImpl implements Channel {
private final java.util.Queue<Packet> resendCache;
- private final ResponseCache responseAsyncCache;
-
private int firstStoredCommandID;
private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
@@ -141,10 +138,8 @@ public final class ChannelImpl implements Channel {
if (confWindowSize != -1) {
resendCache = new ConcurrentLinkedQueue<>();
- responseAsyncCache = new ResponseCache();
} else {
resendCache = null;
- responseAsyncCache = null;
}
this.interceptors = interceptors;
@@ -216,11 +211,7 @@ public final class ChannelImpl implements Channel {
lock.lock();
try {
- ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
- if (responseAsyncCache != null) {
- responseAsyncCache.errorAll(activeMQException);
- }
- response = new ActiveMQExceptionMessage(activeMQException);
+ response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause));
sendCondition.signal();
} finally {
@@ -253,10 +244,6 @@ public final class ChannelImpl implements Channel {
this.transferring = transferring;
}
- protected ResponseCache getCache() {
- return responseAsyncCache;
- }
-
/**
* @param timeoutMsg message to log on blocking call failover timeout
*/
@@ -283,10 +270,6 @@ public final class ChannelImpl implements Channel {
synchronized (sendLock) {
packet.setChannelID(id);
- if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
- packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
- }
-
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}
@@ -308,7 +291,6 @@ public final class ChannelImpl implements Channel {
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
-
} finally {
lock.unlock();
}
@@ -319,30 +301,9 @@ public final class ChannelImpl implements Channel {
checkReconnectID(reconnectID);
- //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
- //As the send could block if the response cache cannot add, preventing responses to be handled.
- if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
- while (!responseAsyncCache.add(packet)) {
- try {
- Thread.sleep(1);
- } catch (Exception e) {
- // Ignore
- }
- }
- }
-
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
- try {
- connection.getTransportConnection().write(buffer, flush, batch);
- } catch (Throwable t) {
- //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
- //The client would get still know about this as the exception bubbles up the call stack instead.
- if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
- responseAsyncCache.remove(packet.getCorrelationID());
- }
- throw t;
- }
+ connection.getTransportConnection().write(buffer, flush, batch);
return true;
}
}
@@ -430,7 +391,7 @@ public final class ChannelImpl implements Channel {
throw new ActiveMQInterruptedException(e);
}
- if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
+ if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) {
ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
}
@@ -517,18 +478,6 @@ public final class ChannelImpl implements Channel {
}
@Override
- public void setResponseHandler(final ResponseHandler responseHandler) {
- if (confWindowSize < 0) {
- final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information.";
- if (logger.isTraceEnabled()) {
- logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg);
- }
- throw new IllegalStateException(msg);
- }
- responseAsyncCache.setResponseHandler(responseHandler);
- }
-
- @Override
public void setHandler(final ChannelHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Setting handler on " + this + " as " + handler);
@@ -646,12 +595,6 @@ public final class ChannelImpl implements Channel {
}
}
- public void handleAsyncResponse(Packet packet) {
- if (responseAsyncCache != null && packet.isResponseAsync()) {
- responseAsyncCache.handleResponse(packet);
- }
- }
-
@Override
public void confirm(final Packet packet) {
if (resendCache != null && packet.isRequiresConfirmations()) {
@@ -704,7 +647,6 @@ public final class ChannelImpl implements Channel {
if (packet.isResponse()) {
confirm(packet);
- handleAsyncResponse(packet);
lock.lock();
try {
@@ -756,9 +698,6 @@ public final class ChannelImpl implements Channel {
if (commandConfirmationHandler != null) {
commandConfirmationHandler.commandConfirmed(packet);
}
- if (responseAsyncCache != null) {
- responseAsyncCache.handleResponse(packet);
- }
}
firstStoredCommandID += numberToClear;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index 9a8166e..5e46848 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
@@ -72,7 +71,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -83,7 +81,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
@@ -91,7 +88,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
@@ -188,25 +184,13 @@ public abstract class PacketDecoder implements Serializable {
break;
}
case EXCEPTION: {
- if (connection.isVersionBeforeAsyncResponseChange()) {
- packet = new ActiveMQExceptionMessage();
- } else {
- packet = new ActiveMQExceptionMessage_V2();
- }
+ packet = new ActiveMQExceptionMessage();
break;
}
case PACKETS_CONFIRMED: {
packet = new PacketsConfirmedMessage();
break;
}
- case NULL_RESPONSE: {
- if (connection.isVersionBeforeAsyncResponseChange()) {
- packet = new NullResponseMessage();
- } else {
- packet = new NullResponseMessage_V2();
- }
- break;
- }
case CREATESESSION: {
packet = new CreateSessionMessage();
break;
@@ -332,11 +316,7 @@ public abstract class PacketDecoder implements Serializable {
break;
}
case SESS_XA_RESP: {
- if (connection.isVersionBeforeAsyncResponseChange()) {
- packet = new SessionXAResponseMessage();
- } else {
- packet = new SessionXAResponseMessage_V2();
- }
+ packet = new SessionXAResponseMessage();
break;
}
case SESS_XA_ROLLBACK: {
@@ -403,16 +383,16 @@ public abstract class PacketDecoder implements Serializable {
packet = new SessionIndividualAcknowledgeMessage();
break;
}
+ case NULL_RESPONSE: {
+ packet = new NullResponseMessage();
+ break;
+ }
case SESS_RECEIVE_CONTINUATION: {
packet = new SessionReceiveContinuationMessage();
break;
}
case SESS_SEND_CONTINUATION: {
- if (connection.isVersionBeforeAsyncResponseChange()) {
- packet = new SessionSendContinuationMessage();
- } else {
- packet = new SessionSendContinuationMessage_V2();
- }
+ packet = new SessionSendContinuationMessage();
break;
}
case SESS_PRODUCER_REQUEST_CREDITS: {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 0168a47..87ba0c3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -31,9 +31,7 @@ public class PacketImpl implements Packet {
// 2.0.0
public static final int ADDRESSING_CHANGE_VERSION = 129;
-
- // 2.7.0
- public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
+ public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@@ -274,7 +272,6 @@ public class PacketImpl implements Packet {
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
-
// Static --------------------------------------------------------
public PacketImpl(final byte type) {
@@ -431,7 +428,7 @@ public class PacketImpl implements Packet {
}
protected String getParentString() {
- return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName();
+ return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
}
private int stringEncodeSize(final String str) {
@@ -442,24 +439,5 @@ public class PacketImpl implements Packet {
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
}
- @Override
- public boolean isRequiresResponse() {
- return false;
- }
-
- @Override
- public boolean isResponseAsync() {
- return false;
- }
-
- @Override
- public long getCorrelationID() {
- return -1;
- }
-
- @Override
- public void setCorrelationID(long correlationID) {
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
deleted file mode 100644
index 8ee73d7..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
-import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
-import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
-
-public class ResponseCache {
-
- private final AtomicLong sequence = new AtomicLong(0);
-
- private final ConcurrentLongHashMap<Packet> store;
- private ResponseHandler responseHandler;
-
- public ResponseCache() {
- this.store = new ConcurrentLongHashMap<>();
- }
-
- public long nextCorrelationID() {
- return sequence.incrementAndGet();
- }
-
- public boolean add(Packet packet) {
- this.store.put(packet.getCorrelationID(), packet);
- return true;
- }
-
- public Packet remove(long correlationID) {
- return store.remove(correlationID);
- }
-
- public void handleResponse(Packet response) {
- long correlationID = response.getCorrelationID();
- Packet packet = remove(correlationID);
- if (packet != null) {
- responseHandler.handleResponse(packet, response);
- }
- }
-
- public void errorAll(ActiveMQException exception) {
- ConcurrentLongHashSet keys = store.keysLongHashSet();
- keys.forEach(correlationID -> {
- handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception));
- });
- }
-
- public void setResponseHandler(ResponseHandler responseHandler) {
- this.responseHandler = responseHandler;
- }
-
- public int size() {
- return this.store.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
index 51637f3..da34d2e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class ActiveMQExceptionMessage extends PacketImpl {
- protected ActiveMQException exception;
+ private ActiveMQException exception;
// Static --------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
deleted file mode 100644
index 661a040..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.utils.DataConstants;
-
-public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage {
-
- private long correlationID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) {
- super(exception);
- this.correlationID = correlationID;
- }
-
- public ActiveMQExceptionMessage_V2() {
- super();
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public boolean isResponse() {
- return true;
- }
-
- @Override
- public void encodeRest(final ActiveMQBuffer buffer) {
- super.encodeRest(buffer);
- buffer.writeLong(correlationID);
- }
-
- @Override
- public void decodeRest(final ActiveMQBuffer buffer) {
- super.decodeRest(buffer);
- if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
- correlationID = buffer.readLong();
- }
- }
-
- @Override
- public final boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public long getCorrelationID() {
- return this.correlationID;
- }
-
- @Override
- public String toString() {
- return getParentString() + ", exception= " + exception + "]";
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!super.equals(obj)) {
- return false;
- }
- if (!(obj instanceof ActiveMQExceptionMessage_V2)) {
- return false;
- }
- ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj;
- if (correlationID != other.correlationID) {
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
index 8c84a9b..a98f888 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
@@ -65,7 +65,6 @@ public class CreateAddressMessage extends PacketImpl {
return address;
}
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
index 985d5f4..2ebf147 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
@@ -100,7 +100,6 @@ public class CreateQueueMessage extends PacketImpl {
return temporary;
}
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
index 3c072e0..af25ae9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
@@ -80,7 +80,6 @@ public class CreateSharedQueueMessage extends PacketImpl {
return filterString;
}
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
deleted file mode 100644
index e3453af..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.utils.DataConstants;
-
-public class NullResponseMessage_V2 extends NullResponseMessage {
-
- private long correlationID;
-
- public NullResponseMessage_V2(final long correlationID) {
- super();
- this.correlationID = correlationID;
- }
-
- public NullResponseMessage_V2() {
- super();
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public long getCorrelationID() {
- return correlationID;
- }
-
- @Override
- public void encodeRest(final ActiveMQBuffer buffer) {
- super.encodeRest(buffer);
- buffer.writeLong(correlationID);
- }
-
- @Override
- public void decodeRest(final ActiveMQBuffer buffer) {
- super.decodeRest(buffer);
- if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
- correlationID = buffer.readLong();
- }
- }
-
- @Override
- public final boolean isResponse() {
- return true;
- }
-
- @Override
- public final boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public String toString() {
- return getParentString() + ", correlationID=" + correlationID + "]";
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!super.equals(obj)) {
- return false;
- }
- if (!(obj instanceof NullResponseMessage_V2)) {
- return false;
- }
- NullResponseMessage_V2 other = (NullResponseMessage_V2) obj;
- if (correlationID != other.correlationID) {
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
index 67d9f67..542c34c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
@@ -51,7 +51,6 @@ public class SessionAcknowledgeMessage extends PacketImpl {
return messageID;
}
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
index e07b50c..f09beeb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
@@ -71,7 +71,6 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
return browseOnly;
}
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
index 3164c23..7d06081 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
@@ -60,7 +60,6 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl {
return messageID;
}
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index 4105b11..26eedd7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public class SessionSendContinuationMessage extends SessionContinuationMessage {
- protected boolean requiresResponse;
+ private boolean requiresResponse;
// Used on confirmation handling
- protected Message message;
+ private Message message;
/**
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
* <br>
@@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
/**
* to be sent on the last package
*/
- protected long messageBodySize = -1;
+ private long messageBodySize = -1;
// Static --------------------------------------------------------
@@ -54,11 +54,6 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
handler = null;
}
- protected SessionSendContinuationMessage(byte type) {
- super(type);
- handler = null;
- }
-
/**
* @param body
* @param continues
@@ -77,31 +72,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
this.messageBodySize = messageBodySize;
}
- /**
- * @param body
- * @param continues
- * @param requiresResponse
- */
- protected SessionSendContinuationMessage(final byte type,
- final Message message,
- final byte[] body,
- final boolean continues,
- final boolean requiresResponse,
- final long messageBodySize,
- SendAcknowledgementHandler handler) {
- super(type, body, continues);
- this.requiresResponse = requiresResponse;
- this.message = message;
- this.handler = handler;
- this.messageBodySize = messageBodySize;
- }
-
// Public --------------------------------------------------------
/**
* @return the requiresResponse
*/
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
deleted file mode 100644
index 2a3071c..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.utils.DataConstants;
-
-/**
- * A SessionSendContinuationMessage<br>
- */
-public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage {
-
- private long correlationID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionSendContinuationMessage_V2() {
- super();
- }
-
- /**
- * @param body
- * @param continues
- * @param requiresResponse
- */
- public SessionSendContinuationMessage_V2(final Message message,
- final byte[] body,
- final boolean continues,
- final boolean requiresResponse,
- final long messageBodySize,
- SendAcknowledgementHandler handler) {
- super(message, body, continues, requiresResponse, messageBodySize, handler);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public int expectedEncodeSize() {
- return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
- }
-
- @Override
- public void encodeRest(final ActiveMQBuffer buffer) {
- super.encodeRest(buffer);
- buffer.writeLong(correlationID);
- }
-
- @Override
- public void decodeRest(final ActiveMQBuffer buffer) {
- super.decodeRest(buffer);
- if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
- correlationID = buffer.readLong();
- }
- }
-
- @Override
- public long getCorrelationID() {
- return this.correlationID;
- }
-
- @Override
- public void setCorrelationID(long correlationID) {
- this.correlationID = correlationID;
- }
-
- @Override
- public boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- StringBuffer buff = new StringBuffer(getParentString());
- buff.append(", continues=" + continues);
- buff.append(", message=" + message);
- buff.append(", messageBodySize=" + messageBodySize);
- buff.append(", requiresResponse=" + requiresResponse);
- buff.append(", correlationID=" + correlationID);
- buff.append("]");
- return buff.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (!super.equals(obj))
- return false;
- if (!(obj instanceof SessionSendContinuationMessage_V2))
- return false;
- SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj;
- if (correlationID != other.correlationID)
- return false;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index e8dbdc1..b56ae30 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -21,7 +21,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
public class SessionSendMessage extends MessagePacket {
@@ -38,22 +37,6 @@ public class SessionSendMessage extends MessagePacket {
private final transient SendAcknowledgementHandler handler;
/** This will be using the CoreMessage because it is meant for the core-protocol */
- protected SessionSendMessage(final byte id,
- final ICoreMessage message,
- final boolean requiresResponse,
- final SendAcknowledgementHandler handler) {
- super(id, message);
- this.handler = handler;
- this.requiresResponse = requiresResponse;
- }
-
- protected SessionSendMessage(final byte id,
- final CoreMessage message) {
- super(id, message);
- this.handler = null;
- }
-
- /** This will be using the CoreMessage because it is meant for the core-protocol */
public SessionSendMessage(final ICoreMessage message,
final boolean requiresResponse,
final SendAcknowledgementHandler handler) {
@@ -69,7 +52,6 @@ public class SessionSendMessage extends MessagePacket {
// Public --------------------------------------------------------
- @Override
public boolean isRequiresResponse() {
return requiresResponse;
}
@@ -80,7 +62,7 @@ public class SessionSendMessage extends MessagePacket {
@Override
public int expectedEncodeSize() {
- return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize();
+ return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1;
}
@Override
@@ -93,16 +75,13 @@ public class SessionSendMessage extends MessagePacket {
public void decodeRest(final ActiveMQBuffer buffer) {
// Buffer comes in after having read standard headers and positioned at Beginning of body part
- ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize());
+ ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
receiveMessage(messageBuffer);
- buffer.readerIndex(buffer.capacity() - fieldsEncodeSize());
+ buffer.readerIndex(buffer.capacity() - 1);
requiresResponse = buffer.readBoolean();
- }
- protected int fieldsEncodeSize() {
- return DataConstants.SIZE_BOOLEAN;
}
protected void receiveMessage(ByteBuf messageBuffer) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
deleted file mode 100644
index 63c9a34..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-
-public class SessionSendMessage_V2 extends SessionSendMessage {
-
- private long correlationID;
-
- /** This will be using the CoreMessage because it is meant for the core-protocol */
- public SessionSendMessage_V2(final ICoreMessage message,
- final boolean requiresResponse,
- final SendAcknowledgementHandler handler) {
- super(SESS_SEND, message, requiresResponse, handler);
- }
-
- public SessionSendMessage_V2(final CoreMessage message) {
- super(SESS_SEND, message);
- }
-
- @Override
- public void encodeRest(ActiveMQBuffer buffer) {
- super.encodeRest(buffer);
- buffer.writeLong(correlationID);
- }
-
- @Override
- public void decodeRest(final ActiveMQBuffer buffer) {
- super.decodeRest(buffer);
- correlationID = buffer.readLong();
- }
-
- @Override
- protected int fieldsEncodeSize() {
- return super.fieldsEncodeSize() + DataConstants.SIZE_LONG;
- }
-
- @Override
- public long getCorrelationID() {
- return this.correlationID;
- }
-
- @Override
- public void setCorrelationID(long correlationID) {
- this.correlationID = correlationID;
- }
-
- @Override
- public boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
- return result;
- }
-
-
- @Override
- public String toString() {
- StringBuffer buff = new StringBuffer(getParentString());
- buff.append(", correlationID=" + correlationID);
- buff.append(", requiresResponse=" + super.isRequiresResponse());
- buff.append("]");
- return buff.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (!super.equals(obj))
- return false;
- if (!(obj instanceof SessionSendMessage_V2))
- return false;
- SessionSendMessage_V2 other = (SessionSendMessage_V2) obj;
- if (correlationID != other.correlationID)
- return false;
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
index f88e0c8..086b851 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
@@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionXAResponseMessage extends PacketImpl {
- protected boolean error;
+ private boolean error;
- protected int responseCode;
+ private int responseCode;
- protected String message;
+ private String message;
public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) {
super(SESS_XA_RESP);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
deleted file mode 100644
index 4e949bd..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.utils.DataConstants;
-
-public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage {
-
- private long correlationID;
-
- public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) {
- super(isError, responseCode, message);
- this.correlationID = correlationID;
- }
-
- public SessionXAResponseMessage_V2() {
- super();
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public long getCorrelationID() {
- return correlationID;
- }
-
- @Override
- public void encodeRest(final ActiveMQBuffer buffer) {
- super.encodeRest(buffer);
- buffer.writeLong(correlationID);
- }
-
- @Override
- public void decodeRest(final ActiveMQBuffer buffer) {
- super.decodeRest(buffer);
- if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
- correlationID = buffer.readLong();
- }
- }
-
- @Override
- public final boolean isResponse() {
- return true;
- }
-
- @Override
- public final boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public String toString() {
- StringBuffer buff = new StringBuffer(getParentString());
- buff.append(", error=" + error);
- buff.append(", message=" + message);
- buff.append(", responseCode=" + responseCode);
- buff.append(", correlationID=" + correlationID);
- buff.append("]");
- return buff.toString();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!super.equals(obj)) {
- return false;
- }
- if (!(obj instanceof SessionXAResponseMessage_V2)) {
- return false;
- }
- SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj;
- if (correlationID != other.correlationID) {
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/main/resources/activemq-version.properties
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties
index ff65ff9..a39b422 100644
--- a/artemis-core-client/src/main/resources/activemq-version.properties
+++ b/artemis-core-client/src/main/resources/activemq-version.properties
@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
-activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130
+activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
deleted file mode 100644
index 416c911..0000000
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl;
-
-import javax.security.auth.Subject;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.protocol.core.Channel;
-import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
-import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
-import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class ChannelImplTest {
-
- ChannelImpl channel;
-
- @Before
- public void setUp() {
- channel = new ChannelImpl(new CoreRR(), 1, 4000, null);
- }
-
- @Test
- public void testCorrelation() {
-
- AtomicInteger handleResponseCount = new AtomicInteger();
-
- RequestPacket requestPacket = new RequestPacket((byte) 1);
- setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
-
- channel.send(requestPacket);
-
- assertEquals(1, channel.getCache().size());
-
- ResponsePacket responsePacket = new ResponsePacket((byte) 1);
- responsePacket.setCorrelationID(requestPacket.getCorrelationID());
-
- channel.handlePacket(responsePacket);
-
- assertEquals(1, handleResponseCount.get());
- assertEquals(0, channel.getCache().size());
- }
-
- private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
- channel.setResponseHandler(responseHandler);
- channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler));
- }
-
- private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
- return new CommandConfirmationHandler() {
- @Override
- public void commandConfirmed(Packet packet) {
- responseHandler.handleResponse(packet, null);
- }
- };
- }
-
- @Test
- public void testPacketsConfirmedMessage() {
-
- AtomicInteger handleResponseCount = new AtomicInteger();
-
- RequestPacket requestPacket = new RequestPacket((byte) 1);
- setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
-
- channel.send(requestPacket);
-
- PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2);
-
- channel.handlePacket(responsePacket);
-
- assertEquals(0, channel.getCache().size());
- }
-
- class RequestPacket extends PacketImpl {
-
- private long id;
-
- RequestPacket(byte type) {
- super(type);
- }
-
- @Override
- public boolean isRequiresResponse() {
- return true;
- }
-
- @Override
- public boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public long getCorrelationID() {
- return id;
- }
-
- @Override
- public void setCorrelationID(long id) {
- this.id = id;
- }
-
- @Override
- public int getPacketSize() {
- return 0;
- }
- }
-
- class ResponsePacket extends PacketImpl {
-
- private long id;
-
- ResponsePacket(byte type) {
- super(type);
- }
-
- @Override
- public boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public boolean isResponse() {
- return true;
- }
-
- @Override
- public long getCorrelationID() {
- return id;
- }
-
- @Override
- public void setCorrelationID(long id) {
- this.id = id;
- }
-
- @Override
- public int getPacketSize() {
- return 0;
- }
- }
-
- class CoreRR implements CoreRemotingConnection {
-
- @Override
- public int getChannelVersion() {
- return 0;
- }
-
- @Override
- public void setChannelVersion(int clientVersion) {
-
- }
-
- @Override
- public Channel getChannel(long channelID, int confWindowSize) {
- return null;
- }
-
- @Override
- public void putChannel(long channelID, Channel channel) {
-
- }
-
- @Override
- public boolean removeChannel(long channelID) {
- return false;
- }
-
- @Override
- public long generateChannelID() {
- return 0;
- }
-
- @Override
- public void syncIDGeneratorSequence(long id) {
-
- }
-
- @Override
- public long getIDGeneratorSequence() {
- return 0;
- }
-
- @Override
- public long getBlockingCallTimeout() {
- return 0;
- }
-
- @Override
- public long getBlockingCallFailoverTimeout() {
- return 0;
- }
-
- @Override
- public Object getTransferLock() {
- return null;
- }
-
- @Override
- public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
- return null;
- }
-
- @Override
- public boolean blockUntilWritable(int size, long timeout) {
- return false;
- }
-
- @Override
- public Object getID() {
- return null;
- }
-
- @Override
- public long getCreationTime() {
- return 0;
- }
-
- @Override
- public String getRemoteAddress() {
- return null;
- }
-
- @Override
- public void scheduledFlush() {
-
- }
-
- @Override
- public void addFailureListener(FailureListener listener) {
-
- }
-
- @Override
- public boolean removeFailureListener(FailureListener listener) {
- return false;
- }
-
- @Override
- public void addCloseListener(CloseListener listener) {
-
- }
-
- @Override
- public boolean removeCloseListener(CloseListener listener) {
- return false;
- }
-
- @Override
- public List<CloseListener> removeCloseListeners() {
- return null;
- }
-
- @Override
- public void setCloseListeners(List<CloseListener> listeners) {
-
- }
-
- @Override
- public List<FailureListener> getFailureListeners() {
- return null;
- }
-
- @Override
- public List<FailureListener> removeFailureListeners() {
- return null;
- }
-
- @Override
- public void setFailureListeners(List<FailureListener> listeners) {
-
- }
-
- @Override
- public ActiveMQBuffer createTransportBuffer(int size) {
- return new ChannelBufferWrapper(Unpooled.buffer(size));
- }
-
- @Override
- public void fail(ActiveMQException me) {
-
- }
-
- @Override
- public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
-
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public Connection getTransportConnection() {
- return new Connection() {
- @Override
- public ActiveMQBuffer createTransportBuffer(int size) {
- return null;
- }
-
- @Override
- public RemotingConnection getProtocolConnection() {
- return null;
- }
-
- @Override
- public void setProtocolConnection(RemotingConnection connection) {
-
- }
-
- @Override
- public boolean isWritable(ReadyListener listener) {
- return false;
- }
-
- @Override
- public void fireReady(boolean ready) {
-
- }
-
- @Override
- public void setAutoRead(boolean autoRead) {
-
- }
-
- @Override
- public Object getID() {
- return null;
- }
-
- @Override
- public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
-
- }
-
- @Override
- public void write(ActiveMQBuffer buffer,
- boolean flush,
- boolean batched,
- ChannelFutureListener futureListener) {
-
- }
-
- @Override
- public void write(ActiveMQBuffer buffer) {
-
- }
-
- @Override
- public void forceClose() {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public String getRemoteAddress() {
- return null;
- }
-
- @Override
- public String getLocalAddress() {
- return null;
- }
-
- @Override
- public void checkFlushBatchBuffer() {
-
- }
-
- @Override
- public TransportConfiguration getConnectorConfig() {
- return null;
- }
-
- @Override
- public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
- return null;
- }
-
- @Override
- public boolean isUsingProtocolHandling() {
- return false;
- }
-
- @Override
- public boolean isSameTarget(TransportConfiguration... configs) {
- return false;
- }
- };
- }
-
- @Override
- public boolean isClient() {
- return true;
- }
-
- @Override
- public boolean isDestroyed() {
- return false;
- }
-
- @Override
- public void disconnect(boolean criticalError) {
-
- }
-
- @Override
- public void disconnect(String scaleDownNodeID, boolean criticalError) {
-
- }
-
- @Override
- public boolean checkDataReceived() {
- return false;
- }
-
- @Override
- public void flush() {
-
- }
-
- @Override
- public boolean isWritable(ReadyListener callback) {
- return false;
- }
-
- @Override
- public void killMessage(SimpleString nodeID) {
-
- }
-
- @Override
- public boolean isSupportReconnect() {
- return false;
- }
-
- @Override
- public boolean isSupportsFlowControl() {
- return false;
- }
-
- @Override
- public Subject getSubject() {
- return null;
- }
-
- @Override
- public String getProtocolName() {
- return null;
- }
-
- @Override
- public void setClientID(String cID) {
-
- }
-
- @Override
- public String getClientID() {
- return null;
- }
-
- @Override
- public String getTransportLocalAddress() {
- return null;
- }
-
- @Override
- public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
-
- }
- }
-
-}
\ No newline at end of file