You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/27 21:41:27 UTC

[1/3] activemq-artemis git commit: ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x a7dbd5711 -> f4734868a


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 37564b5..f5756f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
@@ -76,11 +77,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
@@ -313,7 +316,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = message.isRequiresResponse();
                   sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -342,7 +345,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = request.isRequiresResponse();
                   session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -351,7 +354,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = request.isRequiresResponse();
                   session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -361,7 +364,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
                                       request.isExclusive(), request.isLastValue(), request.isAutoCreated());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -373,7 +376,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                      session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
                   }
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -385,7 +388,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                      session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
                   }
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -393,7 +396,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = true;
                   SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
                   session.deleteQueue(request.getQueueName());
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_QUEUEQUERY: {
@@ -453,62 +456,62 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_COMMIT: {
                   requiresResponse = true;
                   session.commit();
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_ROLLBACK: {
                   requiresResponse = true;
                   session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_XA_COMMIT: {
                   requiresResponse = true;
                   SessionXACommitMessage message = (SessionXACommitMessage) packet;
                   session.xaCommit(message.getXid(), message.isOnePhase());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_END: {
                   requiresResponse = true;
                   SessionXAEndMessage message = (SessionXAEndMessage) packet;
                   session.xaEnd(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_FORGET: {
                   requiresResponse = true;
                   SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
                   session.xaForget(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_JOIN: {
                   requiresResponse = true;
                   SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
                   session.xaJoin(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_RESUME: {
                   requiresResponse = true;
                   SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
                   session.xaResume(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_ROLLBACK: {
                   requiresResponse = true;
                   SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
                   session.xaRollback(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_START: {
                   requiresResponse = true;
                   SessionXAStartMessage message = (SessionXAStartMessage) packet;
                   session.xaStart(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_FAILED: {
@@ -521,14 +524,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_XA_SUSPEND: {
                   requiresResponse = true;
                   session.xaSuspend();
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_PREPARE: {
                   requiresResponse = true;
                   SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
                   session.xaPrepare(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                  response = createSessionXAResponseMessage(packet);
                   break;
                }
                case SESS_XA_INDOUBT_XIDS: {
@@ -557,14 +560,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_STOP: {
                   requiresResponse = true;
                   session.stop();
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_CLOSE: {
                   requiresResponse = true;
                   session.close(false);
                   // removeConnectionListeners();
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   flush = true;
                   closeChannel = true;
                   break;
@@ -574,7 +577,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = message.isRequiresResponse();
                   session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
                   if (requiresResponse) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   break;
                }
@@ -582,7 +585,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = true;
                   SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
                   session.closeConsumer(message.getConsumerID());
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   break;
                }
                case SESS_FORCE_CONSUMER_DELIVERY: {
@@ -591,7 +594,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   break;
                }
                case PacketImpl.SESS_ADD_METADATA: {
-                  response = new NullResponseMessage();
+                  response = createNullResponseMessage(packet);
                   SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
                   session.addMetaData(message.getKey(), message.getData());
                   break;
@@ -600,7 +603,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = true;
                   SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
                   if (message.isRequiresConfirmations()) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   }
                   session.addMetaData(message.getKey(), message.getData());
                   break;
@@ -609,7 +612,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = true;
                   SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
                   if (session.addUniqueMetaData(message.getKey(), message.getData())) {
-                     response = new NullResponseMessage();
+                     response = createNullResponseMessage(packet);
                   } else {
                      response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
                   }
@@ -617,15 +620,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                }
             }
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
          }
          sendResponse(packet, response, flush, closeChannel);
       } finally {
@@ -633,6 +636,26 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       }
    }
 
+   private Packet createNullResponseMessage(Packet packet) {
+      final Packet response;
+      if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {
+         response = new NullResponseMessage();
+      } else {
+         response = new NullResponseMessage_V2(packet.getCorrelationID());
+      }
+      return response;
+   }
+
+   private Packet createSessionXAResponseMessage(Packet packet) {
+      Packet response;
+      if (packet.isResponseAsync()) {
+         response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null);
+      } else {
+         response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+      }
+      return response;
+   }
+
    private void onSessionAcknowledge(Packet packet) {
       this.storageManager.setContext(session.getSessionContext());
       try {
@@ -643,18 +666,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             requiresResponse = message.isRequiresResponse();
             this.session.acknowledge(message.getConsumerID(), message.getMessageID());
             if (requiresResponse) {
-               response = new NullResponseMessage();
+               response = createNullResponseMessage(packet);
             }
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -672,18 +695,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             requiresResponse = message.isRequiresResponse();
             this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
             if (requiresResponse) {
-               response = new NullResponseMessage();
+               response = createNullResponseMessage(packet);
             }
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -700,15 +723,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
             session.requestProducerCredits(message.getAddress(), message.getCredits());
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -725,15 +748,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
             session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
          } catch (ActiveMQIOErrorException e) {
-            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
          } catch (ActiveMQXAException e) {
-            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (ActiveMQException e) {
-            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
          } catch (Throwable t) {
-            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
          }
          sendResponse(packet, response, false, false);
       } finally {
@@ -742,50 +765,68 @@ public class ServerSessionPacketHandler implements ChannelHandler {
    }
 
 
-   private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e,
+   private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet,
+                                                                     ActiveMQIOErrorException e,
                                                                      boolean requiresResponse,
                                                                      Packet response,
                                                                      ServerSession session) {
       session.markTXFailed(e);
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new ActiveMQExceptionMessage(e);
+         response = convertToExceptionPacket(packet, e);
       } else {
          ActiveMQServerLogger.LOGGER.caughtException(e);
       }
       return response;
    }
 
-   private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e,
+   private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet,
+                                                                ActiveMQXAException e,
                                                                 boolean requiresResponse,
                                                                 Packet response) {
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+         if (packet.isResponseAsync()) {
+            response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage());
+         } else {
+            response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+         }
       } else {
          ActiveMQServerLogger.LOGGER.caughtXaException(e);
       }
       return response;
    }
 
-   private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e,
+   private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet,
+                                                                                 ActiveMQQueueMaxConsumerLimitReached e,
                                                                                  boolean requiresResponse,
                                                                                  Packet response) {
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new ActiveMQExceptionMessage(e);
+         response = convertToExceptionPacket(packet, e);
       } else {
          ActiveMQServerLogger.LOGGER.caughtException(e);
       }
       return response;
    }
 
-   private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e,
+   private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) {
+      Packet response;
+      if (packet.isResponseAsync()) {
+         response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e);
+      } else {
+         response = new ActiveMQExceptionMessage(e);
+      }
+      return response;
+   }
+
+   private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet,
+                                                              ActiveMQException e,
                                                               boolean requiresResponse,
                                                               Packet response) {
       if (requiresResponse) {
          logger.debug("Sending exception to client", e);
-         response = new ActiveMQExceptionMessage(e);
+         response = convertToExceptionPacket(packet, e);
       } else {
          if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
             logger.debug("Caught exception", e);
@@ -796,7 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       return response;
    }
 
-   private static Packet onCatchThrowableWhileHandlePacket(Throwable t,
+   private static Packet onCatchThrowableWhileHandlePacket(Packet packet,
+                                                           Throwable t,
                                                            boolean requiresResponse,
                                                            Packet response,
                                                            ServerSession session) {
@@ -805,7 +847,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
          ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
          ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
          activeMQInternalErrorException.initCause(t);
-         response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
+         response = convertToExceptionPacket(packet, activeMQInternalErrorException);
       } else {
          ActiveMQServerLogger.LOGGER.caughtException(t);
       }
@@ -827,12 +869,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
          public void onError(final int errorCode, final String errorMessage) {
             ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
 
-            ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage));
-
-            doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+            Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage));
+            doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel);
 
             if (logger.isTraceEnabled()) {
-               logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage);
+               logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket);
             }
 
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6e26fa6..1f376d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@
       <activemq.version.majorVersion>1</activemq.version.majorVersion>
       <activemq.version.minorVersion>0</activemq.version.minorVersion>
       <activemq.version.microVersion>0</activemq.version.microVersion>
-      <activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
+      <activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
       <activemq.version.versionTag>${project.version}</activemq.version.versionTag>
       <ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index e4afb5b..c7ed869 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
 import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
@@ -315,6 +316,11 @@ public class BackupSyncDelay implements Interceptor {
       }
 
       @Override
+      public void setResponseHandler(ResponseHandler handler) {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
       public void flushConfirmations() {
          throw new UnsupportedOperationException();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
index d3951f2..3020310 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
@@ -167,7 +167,24 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
 
       @Override
       public void onException(Message message, Exception exception) {
-         // TODO Auto-generated method stub
+         latch.countDown();
+         try {
+            switch (call) {
+               case 0:
+                  context.rollback();
+                  break;
+               case 1:
+                  context.commit();
+                  break;
+               case 2:
+                  context.close();
+                  break;
+               default:
+                  throw new IllegalArgumentException("call code " + call);
+            }
+         } catch (Exception error1) {
+            this.error = error1;
+         }
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
index d7137ae..7e121f3 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
@@ -16,12 +16,23 @@
  */
 package org.apache.activemq.artemis.jms.tests;
 
+import static org.junit.Assert.fail;
+
+import javax.jms.CompletionListener;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@@ -67,9 +78,9 @@ public class SecurityTest extends JMSTestCase {
    }
 
 
-      /**
-       * Login with no user, no password Should allow login (equivalent to guest)
-       */
+   /**
+    * Login with no user, no password Should allow login (equivalent to guest)
+    */
    @Test
    public void testLoginNoUserNoPassword() throws Exception {
       createConnection();
@@ -169,6 +180,71 @@ public class SecurityTest extends JMSTestCase {
       }
    }
 
+   /**
+    * Login with valid user and password
+    * But try send to address not authorised - Persistent
+    * Should not allow and should throw exception
+    */
+   @Test
+   public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      Connection connection = connectionFactory.createConnection("guest", "guest");
+      Session session = connection.createSession();
+      Destination destination = session.createQueue("guest.cannot.send");
+      MessageProducer messageProducer = session.createProducer(destination);
+      try {
+         messageProducer.send(session.createTextMessage("hello"));
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         //pass
+      }
+      connection.close();
+   }
+
+   /**
+    * Login with valid user and password
+    * But try send to address not authorised - Non Persistent.
+    * Should have same behaviour as Persistent with exception on send.
+    */
+   @Test
+   public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      connectionFactory.setConfirmationWindowSize(100);
+      connectionFactory.setBlockOnDurableSend(false);
+      connectionFactory.setBlockOnNonDurableSend(false);
+      Connection connection = connectionFactory.createConnection("guest", "guest");
+      Session session = connection.createSession();
+      Destination destination = session.createQueue("guest.cannot.send");
+      MessageProducer messageProducer = session.createProducer(destination);
+      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      try {
+         AtomicReference<Exception> e = new AtomicReference<>();
+         //        messageProducer.send(session.createTextMessage("hello"));
+
+         CountDownLatch countDownLatch = new CountDownLatch(1);
+         messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
+            @Override
+            public void onCompletion(Message message) {
+               countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Message message, Exception exception) {
+               e.set(exception);
+               countDownLatch.countDown();
+            }
+         });
+         countDownLatch.await(10, TimeUnit.SECONDS);
+         if (e.get() != null) {
+            throw e.get();
+         }
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         activeMQSecurityException.printStackTrace();
+      }
+      connection.close();
+   }
+
    /* Now some client id tests */
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/jms-tests/src/test/resources/broker.xml
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/resources/broker.xml b/tests/jms-tests/src/test/resources/broker.xml
index 733e8c3..644ce83 100644
--- a/tests/jms-tests/src/test/resources/broker.xml
+++ b/tests/jms-tests/src/test/resources/broker.xml
@@ -54,6 +54,16 @@
             <permission type="browse" roles="guest,def"/>
             <permission type="send" roles="guest,def"/>
          </security-setting>
+
+         <security-setting match="guest.cannot.send">
+             <permission type="createDurableQueue" roles="guest,def"/>
+             <permission type="deleteDurableQueue" roles="guest,def"/>
+             <permission type="createNonDurableQueue" roles="guest,def"/>
+             <permission type="deleteNonDurableQueue" roles="guest,def"/>
+             <permission type="consume" roles="guest,def"/>
+             <permission type="browse" roles="guest,def"/>
+             <permission type="send" roles="def"/>
+         </security-setting>
      </security-settings>
    </core>
 </configuration>
\ No newline at end of file


[3/3] activemq-artemis git commit: ARTEMIS-1545 refactor & rework a few incompatible pieces

Posted by cl...@apache.org.
ARTEMIS-1545 refactor & rework a few incompatible pieces

Existing commit for ARTEMIS-1545 broke bridges and large messages. This
commit fixes those, and refactors the solution a bit to be more clear.

(cherry picked from commit a28b4fb34eb3cc178dd611d0cb2acc51d6b7a965)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f4734868
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f4734868
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f4734868

Branch: refs/heads/2.6.x
Commit: f4734868a5a07dfc6db533a96f9f8e01de5139c5
Parents: c9d8697
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Jul 17 10:53:21 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 27 17:36:34 2018 -0400

----------------------------------------------------------------------
 .../core/client/SendAcknowledgementHandler.java |   8 +-
 .../core/protocol/core/ResponseHandler.java     |   6 +-
 .../core/impl/ActiveMQSessionContext.java       |  37 +-
 .../core/protocol/core/impl/ChannelImpl.java    |  15 +-
 .../core/protocol/core/impl/PacketImpl.java     |   5 +-
 .../core/protocol/core/impl/ResponseCache.java  |   6 +-
 .../protocol/core/impl/ChannelImplTest.java     | 512 +++++++++++++++++++
 .../jms/client/ActiveMQMessageProducer.java     |  92 ++--
 .../core/ServerSessionPacketHandler.java        |   3 +-
 .../artemis/jms/tests/SecurityTest.java         | 113 +++-
 10 files changed, 722 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
index 0f47536..ad45a5f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
@@ -43,9 +43,11 @@ public interface SendAcknowledgementHandler {
    void sendAcknowledged(Message message);
 
    default void sendFailed(Message message, Exception e) {
-      //This is to keep old behaviour that would ack even if error,
-      // if anyone custom implemented this interface but doesnt update.
-      sendAcknowledged(message);
+      /**
+       * By default ignore failures to preserve compatibility with existing implementations.
+       * If the message makes it to the broker and a failure occurs sendAcknowledge() will
+       * still be invoked just like it always was.
+       */
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
index 21e9879..f96ef13 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
@@ -17,14 +17,14 @@
 package org.apache.activemq.artemis.core.protocol.core;
 
 /**
- * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
+ * A ResponseHandler is used by the channel to handle async responses.
  */
 public interface ResponseHandler {
 
    /**
-    * called by channel after a confirmation has been received.
+    * called by channel after an async response has been received.
     *
     * @param packet the packet confirmed
     */
-   void responseHandler(Packet packet, Packet response);
+   void handleResponse(Packet packet, Packet response);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index aec0fca..658bfcf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -168,11 +168,7 @@ public class ActiveMQSessionContext extends SessionContext {
       sessionChannel.setHandler(handler);
 
       if (confirmationWindow >= 0) {
-         if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
-            sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
-         } else {
-            sessionChannel.setResponseHandler(responseHandler);
-         }
+         setHandlers();
       }
    }
 
@@ -189,16 +185,24 @@ public class ActiveMQSessionContext extends SessionContext {
       this.killed = true;
    }
 
+   private void setHandlers() {
+      sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
+
+      if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+         sessionChannel.setResponseHandler(responseHandler);
+      }
+   }
+
    private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
       @Override
       public void commandConfirmed(Packet packet) {
-         responseHandler.responseHandler(packet, null);
+         responseHandler.handleResponse(packet, null);
       }
    };
 
    private final ResponseHandler responseHandler = new ResponseHandler() {
       @Override
-      public void responseHandler(Packet packet, Packet response) {
+      public void handleResponse(Packet packet, Packet response) {
          final ActiveMQException activeMQException;
          if (response != null && response.getType() == PacketImpl.EXCEPTION) {
             ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
@@ -229,7 +233,7 @@ public class ActiveMQSessionContext extends SessionContext {
             if (exception == null) {
                sendAckHandler.sendAcknowledged(message);
             } else {
-               handler.sendFailed(message, exception);
+               sendAckHandler.sendFailed(message, exception);
             }
          }
       }
@@ -269,11 +273,8 @@ public class ActiveMQSessionContext extends SessionContext {
 
    @Override
    public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
-      if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
-         sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
-      } else {
-         sessionChannel.setResponseHandler(responseHandler);
-      }
+      setHandlers();
+
       this.sendAckHandler = handler;
    }
 
@@ -932,12 +933,12 @@ public class ActiveMQSessionContext extends SessionContext {
                                                          boolean lastChunk,
                                                          byte[] chunk,
                                                          SendAcknowledgementHandler messageHandler) throws ActiveMQException {
-      final boolean requiresResponse = lastChunk || confirmationWindow != -1;
+      final boolean requiresResponse = lastChunk && sendBlocking;
       final SessionSendContinuationMessage chunkPacket;
       if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
          chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
       } else {
-         chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+         chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
       }
       final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
       //perform a weak form of flow control to avoid OOM on tight loops
@@ -955,11 +956,7 @@ public class ActiveMQSessionContext extends SessionContext {
          }
          if (requiresResponse) {
             // When sending it blocking, only the last chunk will be blocking.
-            if (sendBlocking) {
-               channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
-            } else {
-               channel.send(chunkPacket);
-            }
+            channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
          } else {
             channel.send(chunkPacket);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 9cb2a83..61268d6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -253,6 +253,10 @@ public final class ChannelImpl implements Channel {
       this.transferring = transferring;
    }
 
+   protected ResponseCache getCache() {
+      return responseAsyncCache;
+   }
+
    /**
     * @param timeoutMsg message to log on blocking call failover timeout
     */
@@ -316,7 +320,7 @@ public final class ChannelImpl implements Channel {
          checkReconnectID(reconnectID);
 
          //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
-         //As the send could block if the response cache is cannot add, preventing responses to be handled.
+         //As the send could block if the response cache cannot add, preventing responses to be handled.
          if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
             while (!responseAsyncCache.add(packet)) {
                try {
@@ -426,7 +430,7 @@ public final class ChannelImpl implements Channel {
                   throw new ActiveMQInterruptedException(e);
                }
 
-               if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) {
+               if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
                   ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
                }
 
@@ -642,7 +646,7 @@ public final class ChannelImpl implements Channel {
       }
    }
 
-   public void handleResponse(Packet packet) {
+   public void handleAsyncResponse(Packet packet) {
       if (responseAsyncCache != null && packet.isResponseAsync()) {
          responseAsyncCache.handleResponse(packet);
       }
@@ -700,7 +704,7 @@ public final class ChannelImpl implements Channel {
          if (packet.isResponse()) {
             confirm(packet);
 
-            handleResponse(packet);
+            handleAsyncResponse(packet);
             lock.lock();
 
             try {
@@ -752,6 +756,9 @@ public final class ChannelImpl implements Channel {
          if (commandConfirmationHandler != null) {
             commandConfirmationHandler.commandConfirmed(packet);
          }
+         if (responseAsyncCache != null) {
+            responseAsyncCache.handleResponse(packet);
+         }
       }
 
       firstStoredCommandID += numberToClear;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 470e3ae..0168a47 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -31,7 +31,8 @@ public class PacketImpl implements Packet {
 
    // 2.0.0
    public static final int ADDRESSING_CHANGE_VERSION = 129;
-   public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
+
+   // 2.7.0
    public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
 
 
@@ -430,7 +431,7 @@ public class PacketImpl implements Packet {
    }
 
    protected String getParentString() {
-      return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
+      return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName();
    }
 
    private int stringEncodeSize(final String str) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
index f9e8538..8ee73d7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
@@ -53,7 +53,7 @@ public class ResponseCache {
       long correlationID = response.getCorrelationID();
       Packet packet = remove(correlationID);
       if (packet != null) {
-         responseHandler.responseHandler(packet, response);
+         responseHandler.handleResponse(packet, response);
       }
    }
 
@@ -67,4 +67,8 @@ public class ResponseCache {
    public void setResponseHandler(ResponseHandler responseHandler) {
       this.responseHandler = responseHandler;
    }
+
+   public int size() {
+      return this.store.size();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
new file mode 100644
index 0000000..416c911
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl;
+
+import javax.security.auth.Subject;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ChannelImplTest {
+
+   ChannelImpl channel;
+
+   @Before
+   public void setUp() {
+      channel = new ChannelImpl(new CoreRR(), 1, 4000, null);
+   }
+
+   @Test
+   public void testCorrelation() {
+
+      AtomicInteger handleResponseCount = new AtomicInteger();
+
+      RequestPacket requestPacket = new RequestPacket((byte) 1);
+      setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
+
+      channel.send(requestPacket);
+
+      assertEquals(1, channel.getCache().size());
+
+      ResponsePacket responsePacket = new ResponsePacket((byte) 1);
+      responsePacket.setCorrelationID(requestPacket.getCorrelationID());
+
+      channel.handlePacket(responsePacket);
+
+      assertEquals(1, handleResponseCount.get());
+      assertEquals(0, channel.getCache().size());
+   }
+
+   private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
+      channel.setResponseHandler(responseHandler);
+      channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler));
+   }
+
+   private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
+      return new CommandConfirmationHandler() {
+         @Override
+         public void commandConfirmed(Packet packet) {
+            responseHandler.handleResponse(packet, null);
+         }
+      };
+   }
+
+   @Test
+   public void testPacketsConfirmedMessage() {
+
+      AtomicInteger handleResponseCount = new AtomicInteger();
+
+      RequestPacket requestPacket = new RequestPacket((byte) 1);
+      setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
+
+      channel.send(requestPacket);
+
+      PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2);
+
+      channel.handlePacket(responsePacket);
+
+      assertEquals(0, channel.getCache().size());
+   }
+
+   class RequestPacket extends PacketImpl {
+
+      private long id;
+
+      RequestPacket(byte type) {
+         super(type);
+      }
+
+      @Override
+      public boolean isRequiresResponse() {
+         return true;
+      }
+
+      @Override
+      public boolean isResponseAsync() {
+         return true;
+      }
+
+      @Override
+      public long getCorrelationID() {
+         return id;
+      }
+
+      @Override
+      public void setCorrelationID(long id) {
+         this.id = id;
+      }
+
+      @Override
+      public int getPacketSize() {
+         return 0;
+      }
+   }
+
+   class ResponsePacket extends PacketImpl {
+
+      private long id;
+
+      ResponsePacket(byte type) {
+         super(type);
+      }
+
+      @Override
+      public boolean isResponseAsync() {
+         return true;
+      }
+
+      @Override
+      public boolean isResponse() {
+         return true;
+      }
+
+      @Override
+      public long getCorrelationID() {
+         return id;
+      }
+
+      @Override
+      public void setCorrelationID(long id) {
+         this.id = id;
+      }
+
+      @Override
+      public int getPacketSize() {
+         return 0;
+      }
+   }
+
+   class CoreRR implements CoreRemotingConnection {
+
+      @Override
+      public int getChannelVersion() {
+         return 0;
+      }
+
+      @Override
+      public void setChannelVersion(int clientVersion) {
+
+      }
+
+      @Override
+      public Channel getChannel(long channelID, int confWindowSize) {
+         return null;
+      }
+
+      @Override
+      public void putChannel(long channelID, Channel channel) {
+
+      }
+
+      @Override
+      public boolean removeChannel(long channelID) {
+         return false;
+      }
+
+      @Override
+      public long generateChannelID() {
+         return 0;
+      }
+
+      @Override
+      public void syncIDGeneratorSequence(long id) {
+
+      }
+
+      @Override
+      public long getIDGeneratorSequence() {
+         return 0;
+      }
+
+      @Override
+      public long getBlockingCallTimeout() {
+         return 0;
+      }
+
+      @Override
+      public long getBlockingCallFailoverTimeout() {
+         return 0;
+      }
+
+      @Override
+      public Object getTransferLock() {
+         return null;
+      }
+
+      @Override
+      public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
+         return null;
+      }
+
+      @Override
+      public boolean blockUntilWritable(int size, long timeout) {
+         return false;
+      }
+
+      @Override
+      public Object getID() {
+         return null;
+      }
+
+      @Override
+      public long getCreationTime() {
+         return 0;
+      }
+
+      @Override
+      public String getRemoteAddress() {
+         return null;
+      }
+
+      @Override
+      public void scheduledFlush() {
+
+      }
+
+      @Override
+      public void addFailureListener(FailureListener listener) {
+
+      }
+
+      @Override
+      public boolean removeFailureListener(FailureListener listener) {
+         return false;
+      }
+
+      @Override
+      public void addCloseListener(CloseListener listener) {
+
+      }
+
+      @Override
+      public boolean removeCloseListener(CloseListener listener) {
+         return false;
+      }
+
+      @Override
+      public List<CloseListener> removeCloseListeners() {
+         return null;
+      }
+
+      @Override
+      public void setCloseListeners(List<CloseListener> listeners) {
+
+      }
+
+      @Override
+      public List<FailureListener> getFailureListeners() {
+         return null;
+      }
+
+      @Override
+      public List<FailureListener> removeFailureListeners() {
+         return null;
+      }
+
+      @Override
+      public void setFailureListeners(List<FailureListener> listeners) {
+
+      }
+
+      @Override
+      public ActiveMQBuffer createTransportBuffer(int size) {
+         return new ChannelBufferWrapper(Unpooled.buffer(size));
+      }
+
+      @Override
+      public void fail(ActiveMQException me) {
+
+      }
+
+      @Override
+      public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
+
+      }
+
+      @Override
+      public void destroy() {
+
+      }
+
+      @Override
+      public Connection getTransportConnection() {
+         return new Connection() {
+            @Override
+            public ActiveMQBuffer createTransportBuffer(int size) {
+               return null;
+            }
+
+            @Override
+            public RemotingConnection getProtocolConnection() {
+               return null;
+            }
+
+            @Override
+            public void setProtocolConnection(RemotingConnection connection) {
+
+            }
+
+            @Override
+            public boolean isWritable(ReadyListener listener) {
+               return false;
+            }
+
+            @Override
+            public void fireReady(boolean ready) {
+
+            }
+
+            @Override
+            public void setAutoRead(boolean autoRead) {
+
+            }
+
+            @Override
+            public Object getID() {
+               return null;
+            }
+
+            @Override
+            public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
+
+            }
+
+            @Override
+            public void write(ActiveMQBuffer buffer,
+                              boolean flush,
+                              boolean batched,
+                              ChannelFutureListener futureListener) {
+
+            }
+
+            @Override
+            public void write(ActiveMQBuffer buffer) {
+
+            }
+
+            @Override
+            public void forceClose() {
+
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public String getRemoteAddress() {
+               return null;
+            }
+
+            @Override
+            public String getLocalAddress() {
+               return null;
+            }
+
+            @Override
+            public void checkFlushBatchBuffer() {
+
+            }
+
+            @Override
+            public TransportConfiguration getConnectorConfig() {
+               return null;
+            }
+
+            @Override
+            public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
+               return null;
+            }
+
+            @Override
+            public boolean isUsingProtocolHandling() {
+               return false;
+            }
+
+            @Override
+            public boolean isSameTarget(TransportConfiguration... configs) {
+               return false;
+            }
+         };
+      }
+
+      @Override
+      public boolean isClient() {
+         return true;
+      }
+
+      @Override
+      public boolean isDestroyed() {
+         return false;
+      }
+
+      @Override
+      public void disconnect(boolean criticalError) {
+
+      }
+
+      @Override
+      public void disconnect(String scaleDownNodeID, boolean criticalError) {
+
+      }
+
+      @Override
+      public boolean checkDataReceived() {
+         return false;
+      }
+
+      @Override
+      public void flush() {
+
+      }
+
+      @Override
+      public boolean isWritable(ReadyListener callback) {
+         return false;
+      }
+
+      @Override
+      public void killMessage(SimpleString nodeID) {
+
+      }
+
+      @Override
+      public boolean isSupportReconnect() {
+         return false;
+      }
+
+      @Override
+      public boolean isSupportsFlowControl() {
+         return false;
+      }
+
+      @Override
+      public Subject getSubject() {
+         return null;
+      }
+
+      @Override
+      public String getProtocolName() {
+         return null;
+      }
+
+      @Override
+      public void setClientID(String cID) {
+
+      }
+
+      @Override
+      public String getClientID() {
+         return null;
+      }
+
+      @Override
+      public String getTransportLocalAddress() {
+         return null;
+      }
+
+      @Override
+      public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+
+      }
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index fc15d5e..ee4223c 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -34,6 +34,8 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@@ -564,6 +566,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       private final ActiveMQMessageProducer producer;
 
       /**
+       * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent
+       * packet confirmations on the same connection. Using this boolean avoids that possibility.
+       * A new CompletionListenerWrapper is created for each message sent so once it's called once
+       * it will never be called again.
+       */
+      private AtomicBoolean active = new AtomicBoolean(true);
+
+      /**
        * @param jmsMessage
        * @param producer
        */
@@ -577,56 +587,62 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
 
       @Override
       public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
-         if (jmsMessage instanceof StreamMessage) {
-            try {
-               ((StreamMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+         if (active.get()) {
+            if (jmsMessage instanceof StreamMessage) {
+               try {
+                  ((StreamMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
-         if (jmsMessage instanceof BytesMessage) {
-            try {
-               ((BytesMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+            if (jmsMessage instanceof BytesMessage) {
+               try {
+                  ((BytesMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
 
-         try {
-            producer.connection.getThreadAwareContext().setCurrentThread(true);
-            completionListener.onCompletion(jmsMessage);
-         } finally {
-            producer.connection.getThreadAwareContext().clearCurrentThread(true);
+            try {
+               producer.connection.getThreadAwareContext().setCurrentThread(true);
+               completionListener.onCompletion(jmsMessage);
+            } finally {
+               producer.connection.getThreadAwareContext().clearCurrentThread(true);
+               active.set(false);
+            }
          }
       }
 
       @Override
       public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
-         if (jmsMessage instanceof StreamMessage) {
-            try {
-               ((StreamMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+         if (active.get()) {
+            if (jmsMessage instanceof StreamMessage) {
+               try {
+                  ((StreamMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
-         if (jmsMessage instanceof BytesMessage) {
-            try {
-               ((BytesMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+            if (jmsMessage instanceof BytesMessage) {
+               try {
+                  ((BytesMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
 
-         try {
-            producer.connection.getThreadAwareContext().setCurrentThread(true);
-            if (exception instanceof ActiveMQException) {
-               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception);
-            } else if (exception instanceof ActiveMQInterruptedException) {
-               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+            try {
+               producer.connection.getThreadAwareContext().setCurrentThread(true);
+               if (exception instanceof ActiveMQException) {
+                  exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
+               } else if (exception instanceof ActiveMQInterruptedException) {
+                  exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+               }
+               completionListener.onException(jmsMessage, exception);
+            } finally {
+               producer.connection.getThreadAwareContext().clearCurrentThread(true);
+               active.set(false);
             }
-            completionListener.onException(jmsMessage, exception);
-         } finally {
-            producer.connection.getThreadAwareContext().clearCurrentThread(true);
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index f5756f2..16a87d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -893,7 +893,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                                      final Packet response,
                                      final boolean flush,
                                      final boolean closeChannel) {
-      if (confirmPacket != null) {
+      // don't confirm if the response is an exception
+      if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {
          channel.confirm(confirmPacket);
 
          if (flush) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
index 7e121f3..851dbe0 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
@@ -18,12 +18,15 @@ package org.apache.activemq.artemis.jms.tests;
 
 import static org.junit.Assert.fail;
 
+import javax.jms.BytesMessage;
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
 import javax.jms.JMSSecurityException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -33,6 +36,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@@ -187,10 +192,14 @@ public class SecurityTest extends JMSTestCase {
     */
    @Test
    public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
       Connection connection = connectionFactory.createConnection("guest", "guest");
       Session session = connection.createSession();
-      Destination destination = session.createQueue("guest.cannot.send");
+      Destination destination = session.createQueue(queueName.toString());
       MessageProducer messageProducer = session.createProducer(destination);
       try {
          messageProducer.send(session.createTextMessage("hello"));
@@ -208,18 +217,21 @@ public class SecurityTest extends JMSTestCase {
     */
    @Test
    public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
       connectionFactory.setConfirmationWindowSize(100);
       connectionFactory.setBlockOnDurableSend(false);
       connectionFactory.setBlockOnNonDurableSend(false);
       Connection connection = connectionFactory.createConnection("guest", "guest");
       Session session = connection.createSession();
-      Destination destination = session.createQueue("guest.cannot.send");
+      Destination destination = session.createQueue(queueName.toString());
       MessageProducer messageProducer = session.createProducer(destination);
       messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
       try {
          AtomicReference<Exception> e = new AtomicReference<>();
-         //        messageProducer.send(session.createTextMessage("hello"));
 
          CountDownLatch countDownLatch = new CountDownLatch(1);
          messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
@@ -241,6 +253,101 @@ public class SecurityTest extends JMSTestCase {
          fail("JMSSecurityException expected as guest is not allowed to send");
       } catch (JMSSecurityException activeMQSecurityException) {
          activeMQSecurityException.printStackTrace();
+      } finally {
+         connection.close();
+      }
+   }
+
+   /**
+    * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API.
+    */
+   @Test
+   public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      connectionFactory.setConfirmationWindowSize(100);
+      connectionFactory.setBlockOnDurableSend(false);
+      connectionFactory.setBlockOnNonDurableSend(false);
+      JMSContext context = connectionFactory.createContext("guest", "guest");
+      Destination destination = context.createQueue(queueName.toString());
+      JMSProducer messageProducer = context.createProducer();
+      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      try {
+         AtomicReference<Exception> e = new AtomicReference<>();
+
+         CountDownLatch countDownLatch = new CountDownLatch(1);
+         messageProducer.setAsync(new CompletionListener() {
+            @Override
+            public void onCompletion(Message message) {
+               countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Message message, Exception exception) {
+               e.set(exception);
+               countDownLatch.countDown();
+            }
+         });
+         messageProducer.send(destination, context.createTextMessage("hello"));
+         countDownLatch.await(10, TimeUnit.SECONDS);
+         if (e.get() != null) {
+            throw e.get();
+         }
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         activeMQSecurityException.printStackTrace();
+      } finally {
+         context.close();
+      }
+   }
+
+   /**
+    * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message.
+    */
+   @Test
+   public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      connectionFactory.setConfirmationWindowSize(100);
+      connectionFactory.setBlockOnDurableSend(false);
+      connectionFactory.setBlockOnNonDurableSend(false);
+      connectionFactory.setMinLargeMessageSize(1024);
+      Connection connection = connectionFactory.createConnection("guest", "guest");
+      Session session = connection.createSession();
+      Destination destination = session.createQueue(queueName.toString());
+      MessageProducer messageProducer = session.createProducer(destination);
+      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      try {
+         AtomicReference<Exception> e = new AtomicReference<>();
+
+         CountDownLatch countDownLatch = new CountDownLatch(1);
+         BytesMessage message = session.createBytesMessage();
+         message.writeBytes(new byte[10 * 1024]);
+         messageProducer.send(message, new CompletionListener() {
+            @Override
+            public void onCompletion(Message message) {
+               countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Message message, Exception exception) {
+               e.set(exception);
+               countDownLatch.countDown();
+            }
+         });
+         countDownLatch.await(10, TimeUnit.SECONDS);
+         if (e.get() != null) {
+            throw e.get();
+         }
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         activeMQSecurityException.printStackTrace();
       }
       connection.close();
    }


[2/3] activemq-artemis git commit: ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions

Posted by cl...@apache.org.
ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions

(cherry picked from commit e4ba48a31193ac532404d93b37f29d2720f1a863)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c9d8697a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c9d8697a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c9d8697a

Branch: refs/heads/2.6.x
Commit: c9d8697a6cf5e4620970da878fc5ab4f8d9d148f
Parents: a7dbd57
Author: Michael André Pearce <mi...@me.com>
Authored: Thu Dec 14 07:47:30 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 27 17:36:28 2018 -0400

----------------------------------------------------------------------
 .../core/client/SendAcknowledgementHandler.java |   7 +
 .../client/ActiveMQClientMessageBundle.java     |   3 +
 .../artemis/core/protocol/core/Channel.java     |   3 +
 .../protocol/core/CoreRemotingConnection.java   |   5 +
 .../artemis/core/protocol/core/Packet.java      |   8 +
 .../core/protocol/core/ResponseHandler.java     |  30 ++++
 .../core/impl/ActiveMQSessionContext.java       |  78 +++++++--
 .../core/protocol/core/impl/ChannelImpl.java    |  58 ++++++-
 .../core/protocol/core/impl/PacketDecoder.java  |  34 +++-
 .../core/protocol/core/impl/PacketImpl.java     |  21 +++
 .../core/protocol/core/impl/ResponseCache.java  |  70 ++++++++
 .../wireformat/ActiveMQExceptionMessage.java    |   2 +-
 .../wireformat/ActiveMQExceptionMessage_V2.java | 101 +++++++++++
 .../impl/wireformat/CreateAddressMessage.java   |   1 +
 .../impl/wireformat/CreateQueueMessage.java     |   1 +
 .../wireformat/CreateSharedQueueMessage.java    |   1 +
 .../impl/wireformat/NullResponseMessage_V2.java |  96 ++++++++++
 .../wireformat/SessionAcknowledgeMessage.java   |   1 +
 .../SessionCreateConsumerMessage.java           |   1 +
 .../SessionIndividualAcknowledgeMessage.java    |   1 +
 .../SessionSendContinuationMessage.java         |  31 +++-
 .../SessionSendContinuationMessage_V2.java      | 122 +++++++++++++
 .../impl/wireformat/SessionSendMessage.java     |  27 ++-
 .../impl/wireformat/SessionSendMessage_V2.java  | 104 +++++++++++
 .../wireformat/SessionXAResponseMessage.java    |   6 +-
 .../wireformat/SessionXAResponseMessage_V2.java | 102 +++++++++++
 .../main/resources/activemq-version.properties  |   2 +-
 .../jms/client/ActiveMQMessageProducer.java     |  30 ++++
 .../core/protocol/ServerPacketDecoder.java      |   5 +-
 .../core/ServerSessionPacketHandler.java        | 173 ++++++++++++-------
 pom.xml                                         |   2 +-
 .../cluster/util/BackupSyncDelay.java           |   6 +
 .../JmsProducerCompletionListenerTest.java      |  19 +-
 .../artemis/jms/tests/SecurityTest.java         |  82 ++++++++-
 tests/jms-tests/src/test/resources/broker.xml   |  10 ++
 35 files changed, 1134 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
index c164f6c..0f47536 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
@@ -41,4 +41,11 @@ public interface SendAcknowledgementHandler {
     * @param message message sent asynchronously
     */
    void sendAcknowledged(Message message);
+
+   default void sendFailed(Message message, Exception e) {
+      //This is to keep old behaviour that would ack even if error,
+      // if anyone custom implemented this interface but doesnt update.
+      sendAcknowledged(message);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
index bb88e6d..e043ac9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
@@ -228,4 +228,7 @@ public interface ActiveMQClientMessageBundle {
 
    @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
    ActiveMQInterruptedException packetTransmissionInterrupted();
+
+   @Message(id = 119063, value = "Cannot send a packet while response cache is full.")
+   IllegalStateException cannotSendPacketWhilstResponseCacheFull();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 127a69a..56f8259 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -211,6 +211,9 @@ public interface Channel {
     */
    void setCommandConfirmationHandler(CommandConfirmationHandler handler);
 
+   void setResponseHandler(ResponseHandler handler);
+
+
    /**
     * flushes any confirmations on to the connection.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index b6a5d93..74d9847 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -36,6 +36,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
       return  (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
    }
 
+   default boolean isVersionBeforeAsyncResponseChange() {
+      int version = getChannelVersion();
+      return  (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
+   }
+
    /**
     * Sets the client protocol used on the communication. This will determine if the client has
     * support for certain packet types

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index 1f40314..b658090 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -41,6 +41,14 @@ public interface Packet {
       return INITIAL_PACKET_SIZE;
    }
 
+   boolean isRequiresResponse();
+
+   boolean isResponseAsync();
+
+   long getCorrelationID();
+
+   void setCorrelationID(long correlationID);
+
    /**
     * Returns the channel id of the channel that should handle this packet.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
new file mode 100644
index 0000000..21e9879
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core;
+
+/**
+ * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
+ */
+public interface ResponseHandler {
+
+   /**
+    * called by channel after a confirmation has been received.
+    *
+    * @param packet the packet confirmed
+    */
+   void responseHandler(Packet packet, Packet response);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index cbbfcab..aec0fca 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
 import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -90,9 +91,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -165,7 +168,11 @@ public class ActiveMQSessionContext extends SessionContext {
       sessionChannel.setHandler(handler);
 
       if (confirmationWindow >= 0) {
-         sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+         if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+            sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
+         } else {
+            sessionChannel.setResponseHandler(responseHandler);
+         }
       }
    }
 
@@ -182,28 +189,50 @@ public class ActiveMQSessionContext extends SessionContext {
       this.killed = true;
    }
 
-   private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
+   private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
+      @Override
+      public void commandConfirmed(Packet packet) {
+         responseHandler.responseHandler(packet, null);
+      }
+   };
+
+   private final ResponseHandler responseHandler = new ResponseHandler() {
       @Override
-      public void commandConfirmed(final Packet packet) {
+      public void responseHandler(Packet packet, Packet response) {
+         final ActiveMQException activeMQException;
+         if (response != null && response.getType() == PacketImpl.EXCEPTION) {
+            ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
+            activeMQException = exceptionResponseMessage.getException();
+         } else {
+            activeMQException = null;
+         }
+
          if (packet.getType() == PacketImpl.SESS_SEND) {
             SessionSendMessage ssm = (SessionSendMessage) packet;
-            callSendAck(ssm.getHandler(), ssm.getMessage());
+            callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);
          } else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
             SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
             if (!scm.isContinues()) {
-               callSendAck(scm.getHandler(), scm.getMessage());
+               callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);
             }
          }
       }
 
-      private void callSendAck(SendAcknowledgementHandler handler, final Message message) {
+      private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {
          if (handler != null) {
-            handler.sendAcknowledged(message);
+            if (exception == null) {
+               handler.sendAcknowledged(message);
+            } else {
+               handler.sendFailed(message, exception);
+            }
          } else if (sendAckHandler != null) {
-            sendAckHandler.sendAcknowledged(message);
+            if (exception == null) {
+               sendAckHandler.sendAcknowledged(message);
+            } else {
+               handler.sendFailed(message, exception);
+            }
          }
       }
-
    };
 
    // Failover utility methods
@@ -240,7 +269,11 @@ public class ActiveMQSessionContext extends SessionContext {
 
    @Override
    public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
-      sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+      if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+         sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
+      } else {
+         sessionChannel.setResponseHandler(responseHandler);
+      }
       this.sendAckHandler = handler;
    }
 
@@ -468,13 +501,15 @@ public class ActiveMQSessionContext extends SessionContext {
                                boolean sendBlocking,
                                SendAcknowledgementHandler handler,
                                SimpleString defaultAddress) throws ActiveMQException {
-      SessionSendMessage packet;
+      final SessionSendMessage packet;
       if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
          packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
-      } else {
+      } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
          packet = new SessionSendMessage(msgI, sendBlocking, handler);
+      } else {
+         boolean responseRequired = confirmationWindow != -1 || sendBlocking;
+         packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
       }
-
       if (sendBlocking) {
          sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
       } else {
@@ -890,15 +925,20 @@ public class ActiveMQSessionContext extends SessionContext {
       }
    }
 
-   private static int sendSessionSendContinuationMessage(Channel channel,
+   private int sendSessionSendContinuationMessage(Channel channel,
                                                          Message msgI,
                                                          long messageBodySize,
                                                          boolean sendBlocking,
                                                          boolean lastChunk,
                                                          byte[] chunk,
                                                          SendAcknowledgementHandler messageHandler) throws ActiveMQException {
-      final boolean requiresResponse = lastChunk && sendBlocking;
-      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+      final boolean requiresResponse = lastChunk || confirmationWindow != -1;
+      final SessionSendContinuationMessage chunkPacket;
+      if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+         chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+      } else {
+         chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+      }
       final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
       //perform a weak form of flow control to avoid OOM on tight loops
       final CoreRemotingConnection connection = channel.getConnection();
@@ -915,7 +955,11 @@ public class ActiveMQSessionContext extends SessionContext {
          }
          if (requiresResponse) {
             // When sending it blocking, only the last chunk will be blocking.
-            channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+            if (sendBlocking) {
+               channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+            } else {
+               channel.send(chunkPacket);
+            }
          } else {
             channel.send(chunkPacket);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 4d73cf8..9cb2a83 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
 import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -96,6 +97,8 @@ public final class ChannelImpl implements Channel {
 
    private final java.util.Queue<Packet> resendCache;
 
+   private final ResponseCache responseAsyncCache;
+
    private int firstStoredCommandID;
 
    private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
@@ -138,8 +141,10 @@ public final class ChannelImpl implements Channel {
 
       if (confWindowSize != -1) {
          resendCache = new ConcurrentLinkedQueue<>();
+         responseAsyncCache = new ResponseCache();
       } else {
          resendCache = null;
+         responseAsyncCache = null;
       }
 
       this.interceptors = interceptors;
@@ -211,7 +216,11 @@ public final class ChannelImpl implements Channel {
       lock.lock();
 
       try {
-         response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause));
+         ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
+         if (responseAsyncCache != null) {
+            responseAsyncCache.errorAll(activeMQException);
+         }
+         response = new ActiveMQExceptionMessage(activeMQException);
 
          sendCondition.signal();
       } finally {
@@ -270,6 +279,10 @@ public final class ChannelImpl implements Channel {
       synchronized (sendLock) {
          packet.setChannelID(id);
 
+         if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+            packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
+         }
+
          if (logger.isTraceEnabled()) {
             logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
          }
@@ -291,6 +304,7 @@ public final class ChannelImpl implements Channel {
             if (resendCache != null && packet.isRequiresConfirmations()) {
                addResendPacket(packet);
             }
+
          } finally {
             lock.unlock();
          }
@@ -301,9 +315,30 @@ public final class ChannelImpl implements Channel {
 
          checkReconnectID(reconnectID);
 
+         //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
+         //As the send could block if the response cache is cannot add, preventing responses to be handled.
+         if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+            while (!responseAsyncCache.add(packet)) {
+               try {
+                  Thread.sleep(1);
+               } catch (Exception e) {
+                  // Ignore
+               }
+            }
+         }
+
          // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
          // buffer is full, preventing any incoming buffers being handled and blocking failover
-         connection.getTransportConnection().write(buffer, flush, batch);
+         try {
+            connection.getTransportConnection().write(buffer, flush, batch);
+         } catch (Throwable t) {
+            //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
+            //The client would get still know about this as the exception bubbles up the call stack instead.
+            if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+               responseAsyncCache.remove(packet.getCorrelationID());
+            }
+            throw t;
+         }
          return true;
       }
    }
@@ -478,6 +513,18 @@ public final class ChannelImpl implements Channel {
    }
 
    @Override
+   public void setResponseHandler(final ResponseHandler responseHandler) {
+      if (confWindowSize < 0) {
+         final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information.";
+         if (logger.isTraceEnabled()) {
+            logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg);
+         }
+         throw new IllegalStateException(msg);
+      }
+      responseAsyncCache.setResponseHandler(responseHandler);
+   }
+
+   @Override
    public void setHandler(final ChannelHandler handler) {
       if (logger.isTraceEnabled()) {
          logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Setting handler on " + this + " as " + handler);
@@ -595,6 +642,12 @@ public final class ChannelImpl implements Channel {
       }
    }
 
+   public void handleResponse(Packet packet) {
+      if (responseAsyncCache != null && packet.isResponseAsync()) {
+         responseAsyncCache.handleResponse(packet);
+      }
+   }
+
    @Override
    public void confirm(final Packet packet) {
       if (resendCache != null && packet.isRequiresConfirmations()) {
@@ -647,6 +700,7 @@ public final class ChannelImpl implements Channel {
          if (packet.isResponse()) {
             confirm(packet);
 
+            handleResponse(packet);
             lock.lock();
 
             try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index 5e46848..9a8166e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
@@ -71,6 +72,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -81,6 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
@@ -88,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
@@ -184,13 +188,25 @@ public abstract class PacketDecoder implements Serializable {
             break;
          }
          case EXCEPTION: {
-            packet = new ActiveMQExceptionMessage();
+            if (connection.isVersionBeforeAsyncResponseChange()) {
+               packet = new ActiveMQExceptionMessage();
+            } else {
+               packet = new ActiveMQExceptionMessage_V2();
+            }
             break;
          }
          case PACKETS_CONFIRMED: {
             packet = new PacketsConfirmedMessage();
             break;
          }
+         case NULL_RESPONSE: {
+            if (connection.isVersionBeforeAsyncResponseChange()) {
+               packet = new NullResponseMessage();
+            } else {
+               packet = new NullResponseMessage_V2();
+            }
+            break;
+         }
          case CREATESESSION: {
             packet = new CreateSessionMessage();
             break;
@@ -316,7 +332,11 @@ public abstract class PacketDecoder implements Serializable {
             break;
          }
          case SESS_XA_RESP: {
-            packet = new SessionXAResponseMessage();
+            if (connection.isVersionBeforeAsyncResponseChange()) {
+               packet = new SessionXAResponseMessage();
+            } else {
+               packet = new SessionXAResponseMessage_V2();
+            }
             break;
          }
          case SESS_XA_ROLLBACK: {
@@ -383,16 +403,16 @@ public abstract class PacketDecoder implements Serializable {
             packet = new SessionIndividualAcknowledgeMessage();
             break;
          }
-         case NULL_RESPONSE: {
-            packet = new NullResponseMessage();
-            break;
-         }
          case SESS_RECEIVE_CONTINUATION: {
             packet = new SessionReceiveContinuationMessage();
             break;
          }
          case SESS_SEND_CONTINUATION: {
-            packet = new SessionSendContinuationMessage();
+            if (connection.isVersionBeforeAsyncResponseChange()) {
+               packet = new SessionSendContinuationMessage();
+            } else {
+               packet = new SessionSendContinuationMessage_V2();
+            }
             break;
          }
          case SESS_PRODUCER_REQUEST_CREDITS: {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 87ba0c3..470e3ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -32,6 +32,7 @@ public class PacketImpl implements Packet {
    // 2.0.0
    public static final int ADDRESSING_CHANGE_VERSION = 129;
    public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
+   public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
 
 
    public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@@ -272,6 +273,7 @@ public class PacketImpl implements Packet {
 
    public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
 
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type) {
@@ -439,5 +441,24 @@ public class PacketImpl implements Packet {
       return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
    }
 
+   @Override
+   public boolean isRequiresResponse() {
+      return false;
+   }
+
+   @Override
+   public boolean isResponseAsync() {
+      return false;
+   }
+
+   @Override
+   public long getCorrelationID() {
+      return -1;
+   }
+
+   @Override
+   public void setCorrelationID(long correlationID) {
+   }
+
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
new file mode 100644
index 0000000..f9e8538
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
+
+public class ResponseCache {
+
+   private final AtomicLong sequence = new AtomicLong(0);
+
+   private final ConcurrentLongHashMap<Packet> store;
+   private ResponseHandler responseHandler;
+
+   public ResponseCache() {
+      this.store = new ConcurrentLongHashMap<>();
+   }
+
+   public long nextCorrelationID() {
+      return sequence.incrementAndGet();
+   }
+
+   public boolean add(Packet packet) {
+      this.store.put(packet.getCorrelationID(), packet);
+      return true;
+   }
+
+   public Packet remove(long correlationID) {
+      return store.remove(correlationID);
+   }
+
+   public void handleResponse(Packet response) {
+      long correlationID = response.getCorrelationID();
+      Packet packet = remove(correlationID);
+      if (packet != null) {
+         responseHandler.responseHandler(packet, response);
+      }
+   }
+
+   public void errorAll(ActiveMQException exception) {
+      ConcurrentLongHashSet keys = store.keysLongHashSet();
+      keys.forEach(correlationID -> {
+         handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception));
+      });
+   }
+
+   public void setResponseHandler(ResponseHandler responseHandler) {
+      this.responseHandler = responseHandler;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
index da34d2e..51637f3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
 public class ActiveMQExceptionMessage extends PacketImpl {
 
-   private ActiveMQException exception;
+   protected ActiveMQException exception;
 
    // Static --------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
new file mode 100644
index 0000000..661a040
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage {
+
+   private long correlationID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) {
+      super(exception);
+      this.correlationID = correlationID;
+   }
+
+   public ActiveMQExceptionMessage_V2() {
+      super();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public boolean isResponse() {
+      return true;
+   }
+
+   @Override
+   public void encodeRest(final ActiveMQBuffer buffer) {
+      super.encodeRest(buffer);
+      buffer.writeLong(correlationID);
+   }
+
+   @Override
+   public void decodeRest(final ActiveMQBuffer buffer) {
+      super.decodeRest(buffer);
+      if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+         correlationID = buffer.readLong();
+      }
+   }
+
+   @Override
+   public final boolean isResponseAsync() {
+      return true;
+   }
+
+   @Override
+   public long getCorrelationID() {
+      return this.correlationID;
+   }
+
+   @Override
+   public String toString() {
+      return getParentString() + ", exception= " + exception + "]";
+   }
+
+   @Override
+   public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) {
+         return true;
+      }
+      if (!super.equals(obj)) {
+         return false;
+      }
+      if (!(obj instanceof ActiveMQExceptionMessage_V2)) {
+         return false;
+      }
+      ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj;
+      if (correlationID != other.correlationID) {
+         return false;
+      }
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
index a98f888..8c84a9b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
@@ -65,6 +65,7 @@ public class CreateAddressMessage extends PacketImpl {
       return address;
    }
 
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
index 2ebf147..985d5f4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java
@@ -100,6 +100,7 @@ public class CreateQueueMessage extends PacketImpl {
       return temporary;
    }
 
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
index af25ae9..3c072e0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
@@ -80,6 +80,7 @@ public class CreateSharedQueueMessage extends PacketImpl {
       return filterString;
    }
 
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
new file mode 100644
index 0000000..e3453af
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class NullResponseMessage_V2 extends NullResponseMessage {
+
+   private long correlationID;
+
+   public NullResponseMessage_V2(final long correlationID) {
+      super();
+      this.correlationID = correlationID;
+   }
+
+   public NullResponseMessage_V2() {
+      super();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public long getCorrelationID() {
+      return correlationID;
+   }
+
+   @Override
+   public void encodeRest(final ActiveMQBuffer buffer) {
+      super.encodeRest(buffer);
+      buffer.writeLong(correlationID);
+   }
+
+   @Override
+   public void decodeRest(final ActiveMQBuffer buffer) {
+      super.decodeRest(buffer);
+      if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+         correlationID = buffer.readLong();
+      }
+   }
+
+   @Override
+   public final boolean isResponse() {
+      return true;
+   }
+
+   @Override
+   public final boolean isResponseAsync() {
+      return true;
+   }
+
+   @Override
+   public String toString() {
+      return getParentString() + ", correlationID=" + correlationID + "]";
+   }
+
+   @Override
+   public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) {
+         return true;
+      }
+      if (!super.equals(obj)) {
+         return false;
+      }
+      if (!(obj instanceof NullResponseMessage_V2)) {
+         return false;
+      }
+      NullResponseMessage_V2 other = (NullResponseMessage_V2) obj;
+      if (correlationID != other.correlationID) {
+         return false;
+      }
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
index 542c34c..67d9f67 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
@@ -51,6 +51,7 @@ public class SessionAcknowledgeMessage extends PacketImpl {
       return messageID;
    }
 
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
index f09beeb..e07b50c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
@@ -71,6 +71,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
       return browseOnly;
    }
 
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
index 7d06081..3164c23 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
@@ -60,6 +60,7 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl {
       return messageID;
    }
 
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index 26eedd7..4105b11 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public class SessionSendContinuationMessage extends SessionContinuationMessage {
 
-   private boolean requiresResponse;
+   protected boolean requiresResponse;
 
    // Used on confirmation handling
-   private Message message;
+   protected Message message;
    /**
     * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
     * <br>
@@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
    /**
     * to be sent on the last package
     */
-   private long messageBodySize = -1;
+   protected long messageBodySize = -1;
 
    // Static --------------------------------------------------------
 
@@ -54,6 +54,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
       handler = null;
    }
 
+   protected SessionSendContinuationMessage(byte type) {
+      super(type);
+      handler = null;
+   }
+
    /**
     * @param body
     * @param continues
@@ -72,11 +77,31 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
       this.messageBodySize = messageBodySize;
    }
 
+   /**
+    * @param body
+    * @param continues
+    * @param requiresResponse
+    */
+   protected SessionSendContinuationMessage(final byte type,
+                                         final Message message,
+                                         final byte[] body,
+                                         final boolean continues,
+                                         final boolean requiresResponse,
+                                         final long messageBodySize,
+                                         SendAcknowledgementHandler handler) {
+      super(type, body, continues);
+      this.requiresResponse = requiresResponse;
+      this.message = message;
+      this.handler = handler;
+      this.messageBodySize = messageBodySize;
+   }
+
    // Public --------------------------------------------------------
 
    /**
     * @return the requiresResponse
     */
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
new file mode 100644
index 0000000..2a3071c
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+/**
+ * A SessionSendContinuationMessage<br>
+ */
+public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage {
+
+   private long correlationID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionSendContinuationMessage_V2() {
+      super();
+   }
+
+   /**
+    * @param body
+    * @param continues
+    * @param requiresResponse
+    */
+   public SessionSendContinuationMessage_V2(final Message message,
+                                            final byte[] body,
+                                            final boolean continues,
+                                            final boolean requiresResponse,
+                                            final long messageBodySize,
+                                            SendAcknowledgementHandler handler) {
+      super(message, body, continues, requiresResponse, messageBodySize, handler);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public int expectedEncodeSize() {
+      return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
+   }
+
+   @Override
+   public void encodeRest(final ActiveMQBuffer buffer) {
+      super.encodeRest(buffer);
+      buffer.writeLong(correlationID);
+   }
+
+   @Override
+   public void decodeRest(final ActiveMQBuffer buffer) {
+      super.decodeRest(buffer);
+      if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+         correlationID = buffer.readLong();
+      }
+   }
+
+   @Override
+   public long getCorrelationID() {
+      return this.correlationID;
+   }
+
+   @Override
+   public void setCorrelationID(long correlationID) {
+      this.correlationID = correlationID;
+   }
+
+   @Override
+   public boolean isResponseAsync() {
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+      return result;
+   }
+
+   @Override
+   public String toString() {
+      StringBuffer buff = new StringBuffer(getParentString());
+      buff.append(", continues=" + continues);
+      buff.append(", message=" + message);
+      buff.append(", messageBodySize=" + messageBodySize);
+      buff.append(", requiresResponse=" + requiresResponse);
+      buff.append(", correlationID=" + correlationID);
+      buff.append("]");
+      return buff.toString();
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SessionSendContinuationMessage_V2))
+         return false;
+      SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj;
+      if (correlationID != other.correlationID)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index b56ae30..e8dbdc1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class SessionSendMessage extends MessagePacket {
 
@@ -37,6 +38,22 @@ public class SessionSendMessage extends MessagePacket {
    private final transient SendAcknowledgementHandler handler;
 
    /** This will be using the CoreMessage because it is meant for the core-protocol */
+   protected SessionSendMessage(final byte id,
+                             final ICoreMessage message,
+                             final boolean requiresResponse,
+                             final SendAcknowledgementHandler handler) {
+      super(id, message);
+      this.handler = handler;
+      this.requiresResponse = requiresResponse;
+   }
+
+   protected SessionSendMessage(final byte id,
+                                final CoreMessage message) {
+      super(id, message);
+      this.handler = null;
+   }
+
+   /** This will be using the CoreMessage because it is meant for the core-protocol */
    public SessionSendMessage(final ICoreMessage message,
                              final boolean requiresResponse,
                              final SendAcknowledgementHandler handler) {
@@ -52,6 +69,7 @@ public class SessionSendMessage extends MessagePacket {
 
    // Public --------------------------------------------------------
 
+   @Override
    public boolean isRequiresResponse() {
       return requiresResponse;
    }
@@ -62,7 +80,7 @@ public class SessionSendMessage extends MessagePacket {
 
    @Override
    public int expectedEncodeSize() {
-      return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1;
+      return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize();
    }
 
    @Override
@@ -75,13 +93,16 @@ public class SessionSendMessage extends MessagePacket {
    public void decodeRest(final ActiveMQBuffer buffer) {
       // Buffer comes in after having read standard headers and positioned at Beginning of body part
 
-      ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
+      ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize());
       receiveMessage(messageBuffer);
 
-      buffer.readerIndex(buffer.capacity() - 1);
+      buffer.readerIndex(buffer.capacity() - fieldsEncodeSize());
 
       requiresResponse = buffer.readBoolean();
+   }
 
+   protected int fieldsEncodeSize() {
+      return DataConstants.SIZE_BOOLEAN;
    }
 
    protected void receiveMessage(ByteBuf messageBuffer) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
new file mode 100644
index 0000000..63c9a34
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class SessionSendMessage_V2 extends SessionSendMessage {
+
+   private long correlationID;
+
+   /** This will be using the CoreMessage because it is meant for the core-protocol */
+   public SessionSendMessage_V2(final ICoreMessage message,
+                                final boolean requiresResponse,
+                                final SendAcknowledgementHandler handler) {
+      super(SESS_SEND, message, requiresResponse, handler);
+   }
+
+   public SessionSendMessage_V2(final CoreMessage message) {
+      super(SESS_SEND, message);
+   }
+
+   @Override
+   public void encodeRest(ActiveMQBuffer buffer) {
+      super.encodeRest(buffer);
+      buffer.writeLong(correlationID);
+   }
+
+   @Override
+   public void decodeRest(final ActiveMQBuffer buffer) {
+      super.decodeRest(buffer);
+      correlationID = buffer.readLong();
+   }
+
+   @Override
+   protected int fieldsEncodeSize() {
+      return super.fieldsEncodeSize() + DataConstants.SIZE_LONG;
+   }
+
+   @Override
+   public long getCorrelationID() {
+      return this.correlationID;
+   }
+
+   @Override
+   public void setCorrelationID(long correlationID) {
+      this.correlationID = correlationID;
+   }
+
+   @Override
+   public boolean isResponseAsync() {
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+      return result;
+   }
+
+
+   @Override
+   public String toString() {
+      StringBuffer buff = new StringBuffer(getParentString());
+      buff.append(", correlationID=" + correlationID);
+      buff.append(", requiresResponse=" + super.isRequiresResponse());
+      buff.append("]");
+      return buff.toString();
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof SessionSendMessage_V2))
+         return false;
+      SessionSendMessage_V2 other = (SessionSendMessage_V2) obj;
+      if (correlationID != other.correlationID)
+         return false;
+      return true;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
index 086b851..f88e0c8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
@@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
 public class SessionXAResponseMessage extends PacketImpl {
 
-   private boolean error;
+   protected boolean error;
 
-   private int responseCode;
+   protected int responseCode;
 
-   private String message;
+   protected String message;
 
    public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) {
       super(SESS_XA_RESP);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
new file mode 100644
index 0000000..4e949bd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage {
+
+   private long correlationID;
+
+   public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) {
+      super(isError, responseCode, message);
+      this.correlationID = correlationID;
+   }
+
+   public SessionXAResponseMessage_V2() {
+      super();
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public long getCorrelationID() {
+      return correlationID;
+   }
+
+   @Override
+   public void encodeRest(final ActiveMQBuffer buffer) {
+      super.encodeRest(buffer);
+      buffer.writeLong(correlationID);
+   }
+
+   @Override
+   public void decodeRest(final ActiveMQBuffer buffer) {
+      super.decodeRest(buffer);
+      if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+         correlationID = buffer.readLong();
+      }
+   }
+
+   @Override
+   public final boolean isResponse() {
+      return true;
+   }
+
+   @Override
+   public final boolean isResponseAsync() {
+      return true;
+   }
+
+   @Override
+   public String toString() {
+      StringBuffer buff = new StringBuffer(getParentString());
+      buff.append(", error=" + error);
+      buff.append(", message=" + message);
+      buff.append(", responseCode=" + responseCode);
+      buff.append(", correlationID=" + correlationID);
+      buff.append("]");
+      return buff.toString();
+   }
+
+   @Override
+   public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj) {
+         return true;
+      }
+      if (!super.equals(obj)) {
+         return false;
+      }
+      if (!(obj instanceof SessionXAResponseMessage_V2)) {
+         return false;
+      }
+      SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj;
+      if (correlationID != other.correlationID) {
+         return false;
+      }
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-core-client/src/main/resources/activemq-version.properties
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties
index a39b422..ff65ff9 100644
--- a/artemis-core-client/src/main/resources/activemq-version.properties
+++ b/artemis-core-client/src/main/resources/activemq-version.properties
@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
 activemq.version.microVersion=${activemq.version.microVersion}
 activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
 activemq.version.versionTag=${activemq.version.versionTag}
-activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129
+activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index ae1d270..fc15d5e 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -601,6 +601,36 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       }
 
       @Override
+      public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
+         if (jmsMessage instanceof StreamMessage) {
+            try {
+               ((StreamMessage) jmsMessage).reset();
+            } catch (JMSException e) {
+               // HORNETQ-1209 XXX ignore?
+            }
+         }
+         if (jmsMessage instanceof BytesMessage) {
+            try {
+               ((BytesMessage) jmsMessage).reset();
+            } catch (JMSException e) {
+               // HORNETQ-1209 XXX ignore?
+            }
+         }
+
+         try {
+            producer.connection.getThreadAwareContext().setCurrentThread(true);
+            if (exception instanceof ActiveMQException) {
+               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception);
+            } else if (exception instanceof ActiveMQInterruptedException) {
+               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+            }
+            completionListener.onException(jmsMessage, exception);
+         } finally {
+            producer.connection.getThreadAwareContext().clearCurrentThread(true);
+         }
+      }
+
+      @Override
       public String toString() {
          return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index d38f45f..0428abe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -90,8 +91,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
 
       if (connection.isVersionBeforeAddressChange()) {
          sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
-      } else {
+      } else if (connection.isVersionBeforeAsyncResponseChange()) {
          sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
+      } else {
+         sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools));
       }
 
       sendMessage.decode(in);