You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/27 21:41:29 UTC

[3/3] activemq-artemis git commit: ARTEMIS-1545 refactor & rework a few incompatible pieces

ARTEMIS-1545 refactor & rework a few incompatible pieces

Existing commit for ARTEMIS-1545 broke bridges and large messages. This
commit fixes those, and refactors the solution a bit to be more clear.

(cherry picked from commit a28b4fb34eb3cc178dd611d0cb2acc51d6b7a965)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f4734868
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f4734868
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f4734868

Branch: refs/heads/2.6.x
Commit: f4734868a5a07dfc6db533a96f9f8e01de5139c5
Parents: c9d8697
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Jul 17 10:53:21 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 27 17:36:34 2018 -0400

----------------------------------------------------------------------
 .../core/client/SendAcknowledgementHandler.java |   8 +-
 .../core/protocol/core/ResponseHandler.java     |   6 +-
 .../core/impl/ActiveMQSessionContext.java       |  37 +-
 .../core/protocol/core/impl/ChannelImpl.java    |  15 +-
 .../core/protocol/core/impl/PacketImpl.java     |   5 +-
 .../core/protocol/core/impl/ResponseCache.java  |   6 +-
 .../protocol/core/impl/ChannelImplTest.java     | 512 +++++++++++++++++++
 .../jms/client/ActiveMQMessageProducer.java     |  92 ++--
 .../core/ServerSessionPacketHandler.java        |   3 +-
 .../artemis/jms/tests/SecurityTest.java         | 113 +++-
 10 files changed, 722 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
index 0f47536..ad45a5f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java
@@ -43,9 +43,11 @@ public interface SendAcknowledgementHandler {
    void sendAcknowledged(Message message);
 
    default void sendFailed(Message message, Exception e) {
-      //This is to keep old behaviour that would ack even if error,
-      // if anyone custom implemented this interface but doesnt update.
-      sendAcknowledged(message);
+      /**
+       * By default ignore failures to preserve compatibility with existing implementations.
+       * If the message makes it to the broker and a failure occurs sendAcknowledge() will
+       * still be invoked just like it always was.
+       */
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
index 21e9879..f96ef13 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java
@@ -17,14 +17,14 @@
 package org.apache.activemq.artemis.core.protocol.core;
 
 /**
- * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
+ * A ResponseHandler is used by the channel to handle async responses.
  */
 public interface ResponseHandler {
 
    /**
-    * called by channel after a confirmation has been received.
+    * called by channel after an async response has been received.
     *
     * @param packet the packet confirmed
     */
-   void responseHandler(Packet packet, Packet response);
+   void handleResponse(Packet packet, Packet response);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index aec0fca..658bfcf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -168,11 +168,7 @@ public class ActiveMQSessionContext extends SessionContext {
       sessionChannel.setHandler(handler);
 
       if (confirmationWindow >= 0) {
-         if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
-            sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
-         } else {
-            sessionChannel.setResponseHandler(responseHandler);
-         }
+         setHandlers();
       }
    }
 
@@ -189,16 +185,24 @@ public class ActiveMQSessionContext extends SessionContext {
       this.killed = true;
    }
 
+   private void setHandlers() {
+      sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
+
+      if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
+         sessionChannel.setResponseHandler(responseHandler);
+      }
+   }
+
    private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
       @Override
       public void commandConfirmed(Packet packet) {
-         responseHandler.responseHandler(packet, null);
+         responseHandler.handleResponse(packet, null);
       }
    };
 
    private final ResponseHandler responseHandler = new ResponseHandler() {
       @Override
-      public void responseHandler(Packet packet, Packet response) {
+      public void handleResponse(Packet packet, Packet response) {
          final ActiveMQException activeMQException;
          if (response != null && response.getType() == PacketImpl.EXCEPTION) {
             ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
@@ -229,7 +233,7 @@ public class ActiveMQSessionContext extends SessionContext {
             if (exception == null) {
                sendAckHandler.sendAcknowledged(message);
             } else {
-               handler.sendFailed(message, exception);
+               sendAckHandler.sendFailed(message, exception);
             }
          }
       }
@@ -269,11 +273,8 @@ public class ActiveMQSessionContext extends SessionContext {
 
    @Override
    public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
-      if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
-         sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
-      } else {
-         sessionChannel.setResponseHandler(responseHandler);
-      }
+      setHandlers();
+
       this.sendAckHandler = handler;
    }
 
@@ -932,12 +933,12 @@ public class ActiveMQSessionContext extends SessionContext {
                                                          boolean lastChunk,
                                                          byte[] chunk,
                                                          SendAcknowledgementHandler messageHandler) throws ActiveMQException {
-      final boolean requiresResponse = lastChunk || confirmationWindow != -1;
+      final boolean requiresResponse = lastChunk && sendBlocking;
       final SessionSendContinuationMessage chunkPacket;
       if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
          chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
       } else {
-         chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+         chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
       }
       final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
       //perform a weak form of flow control to avoid OOM on tight loops
@@ -955,11 +956,7 @@ public class ActiveMQSessionContext extends SessionContext {
          }
          if (requiresResponse) {
             // When sending it blocking, only the last chunk will be blocking.
-            if (sendBlocking) {
-               channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
-            } else {
-               channel.send(chunkPacket);
-            }
+            channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
          } else {
             channel.send(chunkPacket);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 9cb2a83..61268d6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -253,6 +253,10 @@ public final class ChannelImpl implements Channel {
       this.transferring = transferring;
    }
 
+   protected ResponseCache getCache() {
+      return responseAsyncCache;
+   }
+
    /**
     * @param timeoutMsg message to log on blocking call failover timeout
     */
@@ -316,7 +320,7 @@ public final class ChannelImpl implements Channel {
          checkReconnectID(reconnectID);
 
          //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
-         //As the send could block if the response cache is cannot add, preventing responses to be handled.
+         //As the send could block if the response cache cannot add, preventing responses to be handled.
          if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
             while (!responseAsyncCache.add(packet)) {
                try {
@@ -426,7 +430,7 @@ public final class ChannelImpl implements Channel {
                   throw new ActiveMQInterruptedException(e);
                }
 
-               if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) {
+               if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
                   ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
                }
 
@@ -642,7 +646,7 @@ public final class ChannelImpl implements Channel {
       }
    }
 
-   public void handleResponse(Packet packet) {
+   public void handleAsyncResponse(Packet packet) {
       if (responseAsyncCache != null && packet.isResponseAsync()) {
          responseAsyncCache.handleResponse(packet);
       }
@@ -700,7 +704,7 @@ public final class ChannelImpl implements Channel {
          if (packet.isResponse()) {
             confirm(packet);
 
-            handleResponse(packet);
+            handleAsyncResponse(packet);
             lock.lock();
 
             try {
@@ -752,6 +756,9 @@ public final class ChannelImpl implements Channel {
          if (commandConfirmationHandler != null) {
             commandConfirmationHandler.commandConfirmed(packet);
          }
+         if (responseAsyncCache != null) {
+            responseAsyncCache.handleResponse(packet);
+         }
       }
 
       firstStoredCommandID += numberToClear;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 470e3ae..0168a47 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -31,7 +31,8 @@ public class PacketImpl implements Packet {
 
    // 2.0.0
    public static final int ADDRESSING_CHANGE_VERSION = 129;
-   public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
+
+   // 2.7.0
    public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
 
 
@@ -430,7 +431,7 @@ public class PacketImpl implements Packet {
    }
 
    protected String getParentString() {
-      return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
+      return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName();
    }
 
    private int stringEncodeSize(final String str) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
index f9e8538..8ee73d7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java
@@ -53,7 +53,7 @@ public class ResponseCache {
       long correlationID = response.getCorrelationID();
       Packet packet = remove(correlationID);
       if (packet != null) {
-         responseHandler.responseHandler(packet, response);
+         responseHandler.handleResponse(packet, response);
       }
    }
 
@@ -67,4 +67,8 @@ public class ResponseCache {
    public void setResponseHandler(ResponseHandler responseHandler) {
       this.responseHandler = responseHandler;
    }
+
+   public int size() {
+      return this.store.size();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
new file mode 100644
index 0000000..416c911
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl;
+
+import javax.security.auth.Subject;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ChannelImplTest {
+
+   ChannelImpl channel;
+
+   @Before
+   public void setUp() {
+      channel = new ChannelImpl(new CoreRR(), 1, 4000, null);
+   }
+
+   @Test
+   public void testCorrelation() {
+
+      AtomicInteger handleResponseCount = new AtomicInteger();
+
+      RequestPacket requestPacket = new RequestPacket((byte) 1);
+      setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
+
+      channel.send(requestPacket);
+
+      assertEquals(1, channel.getCache().size());
+
+      ResponsePacket responsePacket = new ResponsePacket((byte) 1);
+      responsePacket.setCorrelationID(requestPacket.getCorrelationID());
+
+      channel.handlePacket(responsePacket);
+
+      assertEquals(1, handleResponseCount.get());
+      assertEquals(0, channel.getCache().size());
+   }
+
+   private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
+      channel.setResponseHandler(responseHandler);
+      channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler));
+   }
+
+   private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
+      return new CommandConfirmationHandler() {
+         @Override
+         public void commandConfirmed(Packet packet) {
+            responseHandler.handleResponse(packet, null);
+         }
+      };
+   }
+
+   @Test
+   public void testPacketsConfirmedMessage() {
+
+      AtomicInteger handleResponseCount = new AtomicInteger();
+
+      RequestPacket requestPacket = new RequestPacket((byte) 1);
+      setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
+
+      channel.send(requestPacket);
+
+      PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2);
+
+      channel.handlePacket(responsePacket);
+
+      assertEquals(0, channel.getCache().size());
+   }
+
+   class RequestPacket extends PacketImpl {
+
+      private long id;
+
+      RequestPacket(byte type) {
+         super(type);
+      }
+
+      @Override
+      public boolean isRequiresResponse() {
+         return true;
+      }
+
+      @Override
+      public boolean isResponseAsync() {
+         return true;
+      }
+
+      @Override
+      public long getCorrelationID() {
+         return id;
+      }
+
+      @Override
+      public void setCorrelationID(long id) {
+         this.id = id;
+      }
+
+      @Override
+      public int getPacketSize() {
+         return 0;
+      }
+   }
+
+   class ResponsePacket extends PacketImpl {
+
+      private long id;
+
+      ResponsePacket(byte type) {
+         super(type);
+      }
+
+      @Override
+      public boolean isResponseAsync() {
+         return true;
+      }
+
+      @Override
+      public boolean isResponse() {
+         return true;
+      }
+
+      @Override
+      public long getCorrelationID() {
+         return id;
+      }
+
+      @Override
+      public void setCorrelationID(long id) {
+         this.id = id;
+      }
+
+      @Override
+      public int getPacketSize() {
+         return 0;
+      }
+   }
+
+   class CoreRR implements CoreRemotingConnection {
+
+      @Override
+      public int getChannelVersion() {
+         return 0;
+      }
+
+      @Override
+      public void setChannelVersion(int clientVersion) {
+
+      }
+
+      @Override
+      public Channel getChannel(long channelID, int confWindowSize) {
+         return null;
+      }
+
+      @Override
+      public void putChannel(long channelID, Channel channel) {
+
+      }
+
+      @Override
+      public boolean removeChannel(long channelID) {
+         return false;
+      }
+
+      @Override
+      public long generateChannelID() {
+         return 0;
+      }
+
+      @Override
+      public void syncIDGeneratorSequence(long id) {
+
+      }
+
+      @Override
+      public long getIDGeneratorSequence() {
+         return 0;
+      }
+
+      @Override
+      public long getBlockingCallTimeout() {
+         return 0;
+      }
+
+      @Override
+      public long getBlockingCallFailoverTimeout() {
+         return 0;
+      }
+
+      @Override
+      public Object getTransferLock() {
+         return null;
+      }
+
+      @Override
+      public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
+         return null;
+      }
+
+      @Override
+      public boolean blockUntilWritable(int size, long timeout) {
+         return false;
+      }
+
+      @Override
+      public Object getID() {
+         return null;
+      }
+
+      @Override
+      public long getCreationTime() {
+         return 0;
+      }
+
+      @Override
+      public String getRemoteAddress() {
+         return null;
+      }
+
+      @Override
+      public void scheduledFlush() {
+
+      }
+
+      @Override
+      public void addFailureListener(FailureListener listener) {
+
+      }
+
+      @Override
+      public boolean removeFailureListener(FailureListener listener) {
+         return false;
+      }
+
+      @Override
+      public void addCloseListener(CloseListener listener) {
+
+      }
+
+      @Override
+      public boolean removeCloseListener(CloseListener listener) {
+         return false;
+      }
+
+      @Override
+      public List<CloseListener> removeCloseListeners() {
+         return null;
+      }
+
+      @Override
+      public void setCloseListeners(List<CloseListener> listeners) {
+
+      }
+
+      @Override
+      public List<FailureListener> getFailureListeners() {
+         return null;
+      }
+
+      @Override
+      public List<FailureListener> removeFailureListeners() {
+         return null;
+      }
+
+      @Override
+      public void setFailureListeners(List<FailureListener> listeners) {
+
+      }
+
+      @Override
+      public ActiveMQBuffer createTransportBuffer(int size) {
+         return new ChannelBufferWrapper(Unpooled.buffer(size));
+      }
+
+      @Override
+      public void fail(ActiveMQException me) {
+
+      }
+
+      @Override
+      public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
+
+      }
+
+      @Override
+      public void destroy() {
+
+      }
+
+      @Override
+      public Connection getTransportConnection() {
+         return new Connection() {
+            @Override
+            public ActiveMQBuffer createTransportBuffer(int size) {
+               return null;
+            }
+
+            @Override
+            public RemotingConnection getProtocolConnection() {
+               return null;
+            }
+
+            @Override
+            public void setProtocolConnection(RemotingConnection connection) {
+
+            }
+
+            @Override
+            public boolean isWritable(ReadyListener listener) {
+               return false;
+            }
+
+            @Override
+            public void fireReady(boolean ready) {
+
+            }
+
+            @Override
+            public void setAutoRead(boolean autoRead) {
+
+            }
+
+            @Override
+            public Object getID() {
+               return null;
+            }
+
+            @Override
+            public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
+
+            }
+
+            @Override
+            public void write(ActiveMQBuffer buffer,
+                              boolean flush,
+                              boolean batched,
+                              ChannelFutureListener futureListener) {
+
+            }
+
+            @Override
+            public void write(ActiveMQBuffer buffer) {
+
+            }
+
+            @Override
+            public void forceClose() {
+
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public String getRemoteAddress() {
+               return null;
+            }
+
+            @Override
+            public String getLocalAddress() {
+               return null;
+            }
+
+            @Override
+            public void checkFlushBatchBuffer() {
+
+            }
+
+            @Override
+            public TransportConfiguration getConnectorConfig() {
+               return null;
+            }
+
+            @Override
+            public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
+               return null;
+            }
+
+            @Override
+            public boolean isUsingProtocolHandling() {
+               return false;
+            }
+
+            @Override
+            public boolean isSameTarget(TransportConfiguration... configs) {
+               return false;
+            }
+         };
+      }
+
+      @Override
+      public boolean isClient() {
+         return true;
+      }
+
+      @Override
+      public boolean isDestroyed() {
+         return false;
+      }
+
+      @Override
+      public void disconnect(boolean criticalError) {
+
+      }
+
+      @Override
+      public void disconnect(String scaleDownNodeID, boolean criticalError) {
+
+      }
+
+      @Override
+      public boolean checkDataReceived() {
+         return false;
+      }
+
+      @Override
+      public void flush() {
+
+      }
+
+      @Override
+      public boolean isWritable(ReadyListener callback) {
+         return false;
+      }
+
+      @Override
+      public void killMessage(SimpleString nodeID) {
+
+      }
+
+      @Override
+      public boolean isSupportReconnect() {
+         return false;
+      }
+
+      @Override
+      public boolean isSupportsFlowControl() {
+         return false;
+      }
+
+      @Override
+      public Subject getSubject() {
+         return null;
+      }
+
+      @Override
+      public String getProtocolName() {
+         return null;
+      }
+
+      @Override
+      public void setClientID(String cID) {
+
+      }
+
+      @Override
+      public String getClientID() {
+         return null;
+      }
+
+      @Override
+      public String getTransportLocalAddress() {
+         return null;
+      }
+
+      @Override
+      public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+
+      }
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index fc15d5e..ee4223c 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -34,6 +34,8 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@@ -564,6 +566,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       private final ActiveMQMessageProducer producer;
 
       /**
+       * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent
+       * packet confirmations on the same connection. Using this boolean avoids that possibility.
+       * A new CompletionListenerWrapper is created for each message sent so once it's called once
+       * it will never be called again.
+       */
+      private AtomicBoolean active = new AtomicBoolean(true);
+
+      /**
        * @param jmsMessage
        * @param producer
        */
@@ -577,56 +587,62 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
 
       @Override
       public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
-         if (jmsMessage instanceof StreamMessage) {
-            try {
-               ((StreamMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+         if (active.get()) {
+            if (jmsMessage instanceof StreamMessage) {
+               try {
+                  ((StreamMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
-         if (jmsMessage instanceof BytesMessage) {
-            try {
-               ((BytesMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+            if (jmsMessage instanceof BytesMessage) {
+               try {
+                  ((BytesMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
 
-         try {
-            producer.connection.getThreadAwareContext().setCurrentThread(true);
-            completionListener.onCompletion(jmsMessage);
-         } finally {
-            producer.connection.getThreadAwareContext().clearCurrentThread(true);
+            try {
+               producer.connection.getThreadAwareContext().setCurrentThread(true);
+               completionListener.onCompletion(jmsMessage);
+            } finally {
+               producer.connection.getThreadAwareContext().clearCurrentThread(true);
+               active.set(false);
+            }
          }
       }
 
       @Override
       public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
-         if (jmsMessage instanceof StreamMessage) {
-            try {
-               ((StreamMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+         if (active.get()) {
+            if (jmsMessage instanceof StreamMessage) {
+               try {
+                  ((StreamMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
-         if (jmsMessage instanceof BytesMessage) {
-            try {
-               ((BytesMessage) jmsMessage).reset();
-            } catch (JMSException e) {
-               // HORNETQ-1209 XXX ignore?
+            if (jmsMessage instanceof BytesMessage) {
+               try {
+                  ((BytesMessage) jmsMessage).reset();
+               } catch (JMSException e) {
+                  // HORNETQ-1209 XXX ignore?
+               }
             }
-         }
 
-         try {
-            producer.connection.getThreadAwareContext().setCurrentThread(true);
-            if (exception instanceof ActiveMQException) {
-               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception);
-            } else if (exception instanceof ActiveMQInterruptedException) {
-               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+            try {
+               producer.connection.getThreadAwareContext().setCurrentThread(true);
+               if (exception instanceof ActiveMQException) {
+                  exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
+               } else if (exception instanceof ActiveMQInterruptedException) {
+                  exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
+               }
+               completionListener.onException(jmsMessage, exception);
+            } finally {
+               producer.connection.getThreadAwareContext().clearCurrentThread(true);
+               active.set(false);
             }
-            completionListener.onException(jmsMessage, exception);
-         } finally {
-            producer.connection.getThreadAwareContext().clearCurrentThread(true);
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index f5756f2..16a87d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -893,7 +893,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                                      final Packet response,
                                      final boolean flush,
                                      final boolean closeChannel) {
-      if (confirmPacket != null) {
+      // don't confirm if the response is an exception
+      if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {
          channel.confirm(confirmPacket);
 
          if (flush) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f4734868/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
index 7e121f3..851dbe0 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java
@@ -18,12 +18,15 @@ package org.apache.activemq.artemis.jms.tests;
 
 import static org.junit.Assert.fail;
 
+import javax.jms.BytesMessage;
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
 import javax.jms.JMSSecurityException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -33,6 +36,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@@ -187,10 +192,14 @@ public class SecurityTest extends JMSTestCase {
     */
    @Test
    public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
       Connection connection = connectionFactory.createConnection("guest", "guest");
       Session session = connection.createSession();
-      Destination destination = session.createQueue("guest.cannot.send");
+      Destination destination = session.createQueue(queueName.toString());
       MessageProducer messageProducer = session.createProducer(destination);
       try {
          messageProducer.send(session.createTextMessage("hello"));
@@ -208,18 +217,21 @@ public class SecurityTest extends JMSTestCase {
     */
    @Test
    public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
       connectionFactory.setConfirmationWindowSize(100);
       connectionFactory.setBlockOnDurableSend(false);
       connectionFactory.setBlockOnNonDurableSend(false);
       Connection connection = connectionFactory.createConnection("guest", "guest");
       Session session = connection.createSession();
-      Destination destination = session.createQueue("guest.cannot.send");
+      Destination destination = session.createQueue(queueName.toString());
       MessageProducer messageProducer = session.createProducer(destination);
       messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
       try {
          AtomicReference<Exception> e = new AtomicReference<>();
-         //        messageProducer.send(session.createTextMessage("hello"));
 
          CountDownLatch countDownLatch = new CountDownLatch(1);
          messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
@@ -241,6 +253,101 @@ public class SecurityTest extends JMSTestCase {
          fail("JMSSecurityException expected as guest is not allowed to send");
       } catch (JMSSecurityException activeMQSecurityException) {
          activeMQSecurityException.printStackTrace();
+      } finally {
+         connection.close();
+      }
+   }
+
+   /**
+    * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API.
+    */
+   @Test
+   public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      connectionFactory.setConfirmationWindowSize(100);
+      connectionFactory.setBlockOnDurableSend(false);
+      connectionFactory.setBlockOnNonDurableSend(false);
+      JMSContext context = connectionFactory.createContext("guest", "guest");
+      Destination destination = context.createQueue(queueName.toString());
+      JMSProducer messageProducer = context.createProducer();
+      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      try {
+         AtomicReference<Exception> e = new AtomicReference<>();
+
+         CountDownLatch countDownLatch = new CountDownLatch(1);
+         messageProducer.setAsync(new CompletionListener() {
+            @Override
+            public void onCompletion(Message message) {
+               countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Message message, Exception exception) {
+               e.set(exception);
+               countDownLatch.countDown();
+            }
+         });
+         messageProducer.send(destination, context.createTextMessage("hello"));
+         countDownLatch.await(10, TimeUnit.SECONDS);
+         if (e.get() != null) {
+            throw e.get();
+         }
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         activeMQSecurityException.printStackTrace();
+      } finally {
+         context.close();
+      }
+   }
+
+   /**
+    * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message.
+    */
+   @Test
+   public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
+      if (getJmsServer().locateQueue(queueName) == null) {
+         getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      }
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+      connectionFactory.setConfirmationWindowSize(100);
+      connectionFactory.setBlockOnDurableSend(false);
+      connectionFactory.setBlockOnNonDurableSend(false);
+      connectionFactory.setMinLargeMessageSize(1024);
+      Connection connection = connectionFactory.createConnection("guest", "guest");
+      Session session = connection.createSession();
+      Destination destination = session.createQueue(queueName.toString());
+      MessageProducer messageProducer = session.createProducer(destination);
+      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      try {
+         AtomicReference<Exception> e = new AtomicReference<>();
+
+         CountDownLatch countDownLatch = new CountDownLatch(1);
+         BytesMessage message = session.createBytesMessage();
+         message.writeBytes(new byte[10 * 1024]);
+         messageProducer.send(message, new CompletionListener() {
+            @Override
+            public void onCompletion(Message message) {
+               countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Message message, Exception exception) {
+               e.set(exception);
+               countDownLatch.countDown();
+            }
+         });
+         countDownLatch.await(10, TimeUnit.SECONDS);
+         if (e.get() != null) {
+            throw e.get();
+         }
+         fail("JMSSecurityException expected as guest is not allowed to send");
+      } catch (JMSSecurityException activeMQSecurityException) {
+         activeMQSecurityException.printStackTrace();
       }
       connection.close();
    }