You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/03/11 20:26:13 UTC

[1/2] activemq-artemis git commit: ARTEMIS-437 Large Message send should be interrupted during failover

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 212c16867 -> 4ba11c8bb


ARTEMIS-437 Large Message send should be interrupted during failover


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/26fe21ba
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/26fe21ba
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/26fe21ba

Branch: refs/heads/master
Commit: 26fe21baa4fed54d369b7090732081d1546b9638
Parents: 212c168
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 11 13:09:07 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 11 13:13:32 2016 -0500

----------------------------------------------------------------------
 .../api/core/ActiveMQInterruptedException.java  |   4 +
 .../client/ActiveMQClientMessageBundle.java     |   4 +
 .../impl/ClientProducerCreditManagerImpl.java   |   2 +-
 .../core/client/impl/ClientProducerCredits.java |   2 +-
 .../client/impl/ClientProducerCreditsImpl.java  |   7 +-
 .../core/client/impl/ClientProducerImpl.java    |  59 ++---
 .../client/impl/LargeMessageControllerImpl.java |  15 +-
 .../artemis/core/protocol/core/Channel.java     |  27 ++
 .../core/impl/ActiveMQSessionContext.java       |   9 +-
 .../core/protocol/core/impl/ChannelImpl.java    |  38 ++-
 .../spi/core/remoting/SessionContext.java       |   3 +
 .../artemis/jms/client/ActiveMQConnection.java  |   4 +
 .../jms/client/ActiveMQMessageProducer.java     |   6 +
 .../LargeMessageOverReplicationTest.java        | 264 +++++++++++++++++++
 .../cluster/util/BackupSyncDelay.java           |  15 ++
 15 files changed, 406 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
index 9a3ff7f..c8fd80b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
@@ -26,4 +26,8 @@ public final class ActiveMQInterruptedException extends RuntimeException {
    public ActiveMQInterruptedException(Throwable cause) {
       super(cause);
    }
+
+   public ActiveMQInterruptedException(String message) {
+      super(message);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
index c23e9bb..08f51ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterceptorRejectedPacketException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageException;
 import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
 import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
@@ -230,4 +231,7 @@ public interface ActiveMQClientMessageBundle {
 
    @Message(id = 119061, value = "Cannot send a packet while channel is failing over.")
    IllegalStateException cannotSendPacketDuringFailover();
+
+   @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
+   ActiveMQInterruptedException packetTransmissionInterrupted();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index 982ce29..32ada4f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -171,7 +171,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
       static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
 
       @Override
-      public void acquireCredits(int credits) throws InterruptedException {
+      public void acquireCredits(int credits)  {
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index 443d7e5..a97df92 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -21,7 +21,7 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 
 public interface ClientProducerCredits {
 
-   void acquireCredits(int credits) throws InterruptedException, ActiveMQException;
+   void acquireCredits(int credits) throws ActiveMQException;
 
    void receiveCredits(int credits);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index f7cf98f..70fda67 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.client.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
@@ -75,7 +76,7 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
    }
 
    @Override
-   public void acquireCredits(final int credits) throws InterruptedException, ActiveMQException {
+   public void acquireCredits(final int credits) throws ActiveMQException {
       checkCredits(credits);
 
       boolean tryAcquire;
@@ -94,6 +95,10 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
                   ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
                }
             }
+            catch (InterruptedException interrupted) {
+               Thread.currentThread().interrupt();
+               throw new ActiveMQInterruptedException(interrupted);
+            }
             finally {
                this.blocked = false;
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index b963aac..b4aa196 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -23,14 +23,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.message.BodyEncoder;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.DeflaterReader;
 import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
@@ -286,20 +284,15 @@ public class ClientProducerImpl implements ClientProducerInternal {
                                    final boolean sendBlocking,
                                    final ClientProducerCredits theCredits,
                                    final SendAcknowledgementHandler handler) throws ActiveMQException {
-      try {
-         // This will block if credits are not available
+      // This will block if credits are not available
 
-         // Note, that for a large message, the encode size only includes the properties + headers
-         // Not the continuations, but this is ok since we are only interested in limiting the amount of
-         // data in *memory* and continuations go straight to the disk
+      // Note, that for a large message, the encode size only includes the properties + headers
+      // Not the continuations, but this is ok since we are only interested in limiting the amount of
+      // data in *memory* and continuations go straight to the disk
 
-         int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
+      int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
 
-         theCredits.acquireCredits(creditSize);
-      }
-      catch (InterruptedException e) {
-         throw new ActiveMQInterruptedException(e);
-      }
+      theCredits.acquireCredits(creditSize);
 
       sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
    }
@@ -352,12 +345,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
       // On the case of large messages we tried to send credits before but we would starve otherwise
       // we may find a way to improve the logic and always acquire the credits before
       // but that's the way it's been tested and been working ATM
-      try {
-         credits.acquireCredits(creditsUsed);
-      }
-      catch (InterruptedException e) {
-         throw new ActiveMQInterruptedException(e);
-      }
+      credits.acquireCredits(creditsUsed);
    }
 
    /**
@@ -379,6 +367,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
       final long bodySize = context.getLargeBodySize();
 
+      final int reconnectID = sessionContext.getReconnectID();
+
       context.open();
       try {
 
@@ -396,14 +386,9 @@ public class ClientProducerImpl implements ClientProducerInternal {
             lastChunk = pos >= bodySize;
             SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
 
-            int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
+            int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, messageHandler);
 
-            try {
-               credits.acquireCredits(creditsUsed);
-            }
-            catch (InterruptedException e) {
-               throw new ActiveMQInterruptedException(e);
-            }
+            credits.acquireCredits(creditsUsed);
          }
       }
       finally {
@@ -457,6 +442,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
       boolean headerSent = false;
 
+
+      int reconnectID = sessionContext.getReconnectID();
       while (!lastPacket) {
          byte[] buff = new byte[minLargeMessageSize];
 
@@ -485,8 +472,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
          totalSize += pos;
 
-         final SessionSendContinuationMessage chunk;
-
          if (lastPacket) {
             if (!session.isCompressLargeMessages()) {
                messageSize.set(totalSize);
@@ -514,13 +499,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
                   headerSent = true;
                   sendInitialLargeMessageHeader(msgI, credits);
                }
-               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler);
-               try {
-                  credits.acquireCredits(creditsSent);
-               }
-               catch (InterruptedException e) {
-                  throw new ActiveMQInterruptedException(e);
-               }
+               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+               credits.acquireCredits(creditsSent);
             }
          }
          else {
@@ -529,13 +509,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
                sendInitialLargeMessageHeader(msgI, credits);
             }
 
-            int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler);
-            try {
-               credits.acquireCredits(creditsSent);
-            }
-            catch (InterruptedException e) {
-               throw new ActiveMQInterruptedException(e);
-            }
+            int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
+            credits.acquireCredits(creditsSent);
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index fb5b687..289181e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
@@ -336,7 +337,19 @@ public class LargeMessageControllerImpl implements LargeMessageController {
       // once the exception is set, the controller is pretty much useless
       if (handledException != null) {
          if (handledException instanceof ActiveMQException) {
-            throw (ActiveMQException) handledException;
+            ActiveMQException nestedException;
+
+            // This is just to be user friendly and give the user a proper exception trace,
+            // instead to just where it was canceled.
+            if (handledException instanceof ActiveMQLargeMessageInterruptedException) {
+               nestedException = new ActiveMQLargeMessageInterruptedException(handledException.getMessage());
+            }
+            else {
+               nestedException = new ActiveMQException(((ActiveMQException) handledException).getType(), handledException.getMessage());
+            }
+            nestedException.initCause(handledException);
+
+            throw nestedException;
          }
          else {
             throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY, "Error on saving LargeMessageBufferImpl", handledException);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index fba1a1c..4c59174 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -40,6 +40,13 @@ public interface Channel {
    long getID();
 
    /**
+    * This number increases every time the channel reconnects succesfully.
+    * This is used to guarantee the integrity of the channel on sequential commands such as large messages.
+    * @return
+    */
+   int getReconnectID();
+
+   /**
     * For protocol check
     */
    boolean supports(byte packetID);
@@ -54,6 +61,15 @@ public interface Channel {
    boolean send(Packet packet);
 
    /**
+    * Sends a packet on this channel.
+    *
+    * @param packet the packet to send
+    * @return false if the packet was rejected by an outgoing interceptor; true if the send was
+    * successful
+    */
+   boolean send(Packet packet, final int reconnectID);
+
+   /**
     * Sends a packet on this channel using batching algorithm if appropriate
     *
     * @param packet the packet to send
@@ -83,6 +99,17 @@ public interface Channel {
    Packet sendBlocking(Packet packet, byte expectedPacket) throws ActiveMQException;
 
    /**
+    * Sends a packet on this channel and then blocks until a response is received or a timeout
+    * occurs.
+    *
+    * @param packet         the packet to send
+    * @param expectedPacket the packet being expected.
+    * @return the response
+    * @throws ActiveMQException if an error occurs during the send
+    */
+   Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException;
+
+   /**
     * Sets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should
     * forward received packets to.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index f723802..ff61c21 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -154,6 +154,10 @@ public class ActiveMQSessionContext extends SessionContext {
       }
    }
 
+   public int getReconnectID() {
+      return sessionChannel.getReconnectID();
+   }
+
    private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
       @Override
       public void commandConfirmed(final Packet packet) {
@@ -413,16 +417,17 @@ public class ActiveMQSessionContext extends SessionContext {
                                     boolean sendBlocking,
                                     boolean lastChunk,
                                     byte[] chunk,
+                                    int reconnectID,
                                     SendAcknowledgementHandler messageHandler) throws ActiveMQException {
       final boolean requiresResponse = lastChunk && sendBlocking;
       final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
 
       if (requiresResponse) {
          // When sending it blocking, only the last chunk will be blocking.
-         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+         sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
       }
       else {
-         sessionChannel.send(chunkPacket);
+         sessionChannel.send(chunkPacket, reconnectID);
       }
 
       return chunkPacket.getPacketSize();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 4ef6104..a7cb659 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -83,6 +83,9 @@ public final class ChannelImpl implements Channel {
 
    private volatile long id;
 
+   /** This is used in */
+   private final AtomicInteger reconnectID = new AtomicInteger(0);
+
    private ChannelHandler handler;
 
    private Packet response;
@@ -139,6 +142,10 @@ public final class ChannelImpl implements Channel {
       this.interceptors = interceptors;
    }
 
+   public int getReconnectID() {
+      return reconnectID.get();
+   }
+
    @Override
    public boolean supports(final byte packetType) {
       int version = connection.getClientVersion();
@@ -202,17 +209,21 @@ public final class ChannelImpl implements Channel {
 
    @Override
    public boolean sendAndFlush(final Packet packet) {
-      return send(packet, true, false);
+      return send(packet, -1, true, false);
    }
 
    @Override
    public boolean send(final Packet packet) {
-      return send(packet, false, false);
+      return send(packet, -1, false, false);
+   }
+
+   public boolean send(Packet packet, final int reconnectID) {
+      return send(packet, reconnectID, false, false);
    }
 
    @Override
    public boolean sendBatched(final Packet packet) {
-      return send(packet, false, true);
+      return send(packet, -1, false, true);
    }
 
    @Override
@@ -221,7 +232,7 @@ public final class ChannelImpl implements Channel {
    }
 
    // This must never called by more than one thread concurrently
-   public boolean send(final Packet packet, final boolean flush, final boolean batch) {
+   private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
       if (invokeInterceptors(packet, interceptors, connection) != null) {
          return false;
       }
@@ -271,6 +282,8 @@ public final class ChannelImpl implements Channel {
             ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
          }
 
+         checkReconnectID(reconnectID);
+
          // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
          // buffer is full, preventing any incoming buffers being handled and blocking failover
          connection.getTransportConnection().write(buffer, flush, batch);
@@ -279,13 +292,24 @@ public final class ChannelImpl implements Channel {
       }
    }
 
+   private void checkReconnectID(int reconnectID) {
+      if (reconnectID >= 0 && reconnectID != this.reconnectID.get()) {
+         throw ActiveMQClientMessageBundle.BUNDLE.packetTransmissionInterrupted();
+      }
+   }
+
+   @Override
+   public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException {
+      return sendBlocking(packet, -1, expectedPacket);
+   }
+
    /**
     * Due to networking issues or server issues the server may take longer to answer than expected.. the client may timeout the call throwing an exception
     * and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception.
     * The expectedPacket will be used to filter out undesirable packets that would belong to previous calls.
     */
    @Override
-   public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException {
+   public Packet sendBlocking(final Packet packet, final int reconnectID, byte expectedPacket) throws ActiveMQException {
       String interceptionResult = invokeInterceptors(packet, interceptors, connection);
 
       if (interceptionResult != null) {
@@ -335,6 +359,8 @@ public final class ChannelImpl implements Channel {
                addResendPacket(packet);
             }
 
+            checkReconnectID(reconnectID);
+
             connection.getTransportConnection().write(buffer, false, false);
 
             long toWait = connection.getBlockingCallTimeout();
@@ -492,6 +518,8 @@ public final class ChannelImpl implements Channel {
    public void lock() {
       lock.lock();
 
+      reconnectID.incrementAndGet();
+
       failingOver = true;
 
       lock.unlock();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index f766e48..774dbfe 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -61,6 +61,8 @@ public abstract class SessionContext {
 
    public abstract void resetName(String name);
 
+   public abstract int getReconnectID();
+
    /**
     * it will eather reattach or reconnect, preferably reattaching it.
     *
@@ -145,6 +147,7 @@ public abstract class SessionContext {
                                              boolean sendBlocking,
                                              boolean lastChunk,
                                              byte[] chunk,
+                                             int reconnectID,
                                              SendAcknowledgementHandler messageHandler) throws ActiveMQException;
 
    public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
index 19e5b67..e8122d0 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
@@ -625,6 +625,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
       }
    }
 
+   public ClientSessionFactory getSessionFactory() {
+      return sessionFactory;
+   }
+
    // Private --------------------------------------------------------------------------------------
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 9c6c497..0f33c04 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -35,6 +35,7 @@ import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -500,6 +501,11 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
             clientProducer.send(address, coreMessage);
          }
       }
+      catch (ActiveMQInterruptedException e) {
+         JMSException jmsException = new JMSException(e.getMessage());
+         jmsException.initCause(e);
+         throw jmsException;
+      }
       catch (ActiveMQException e) {
          throw JMSExceptionHelper.convertFromActiveMQException(e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
new file mode 100644
index 0000000..5f4d2e4
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.qpid.transport.util.Logger;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
+
+   public static int messageChunkCount = 0;
+
+   private static final ReusableLatch ruleFired = new ReusableLatch(1);
+   private static ActiveMQServer backupServer;
+   private static ActiveMQServer liveServer;
+
+
+   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
+   ActiveMQConnection connection;
+   Session session;
+   Queue queue;
+   MessageProducer producer;
+
+
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      ruleFired.setCount(1);
+      messageChunkCount = 0;
+
+      TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
+      TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
+      TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
+      TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
+
+      Configuration backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true));
+
+      Configuration liveConfig = createDefaultInVMConfig();
+
+      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
+
+      liveServer = createServer(liveConfig);
+      liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue"));
+      liveServer.start();
+
+      waitForServerToStart(liveServer);
+
+      backupServer = createServer(backupConfig);
+      backupServer.start();
+
+      waitForServerToStart(backupServer);
+
+
+      // Just to make sure the expression worked
+      Assert.assertEquals(10000, factory.getMinLargeMessageSize());
+      Assert.assertEquals(10000, factory.getProducerWindowSize());
+      Assert.assertEquals(100, factory.getRetryInterval());
+      Assert.assertEquals(-1, factory.getReconnectAttempts());
+      Assert.assertTrue(factory.isHA());
+
+      connection = (ActiveMQConnection) factory.createConnection();
+
+      waitForRemoteBackup(connection.getSessionFactory(), 30);
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      queue = session.createQueue("jms.queue.Queue");
+      producer = session.createProducer(queue);
+
+   }
+
+   @After
+   public void stopServers() throws Exception {
+      if (connection != null) {
+         try {
+            connection.close();
+         }
+         catch (Exception e) {
+         }
+      }
+      if (backupServer != null) {
+         backupServer.stop();
+         backupServer = null;
+      }
+
+      if (liveServer != null) {
+         liveServer.stop();
+         liveServer = null;
+      }
+
+      backupServer = liveServer = null;
+   }
+
+    /*
+   * simple test to induce a potential race condition where the server's acceptors are active, but the server's
+   * state != STARTED
+   */
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "InterruptSending",
+         targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext",
+         targetMethod = "sendLargeMessageChunk",
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkSent();")})
+   public void testSendLargeMessage() throws Exception {
+
+      MapMessage message = createLargeMessage();
+
+      try {
+         producer.send(message);
+         Assert.fail("expected an exception");
+         //      session.commit();
+      }
+      catch (JMSException expected) {
+      }
+
+      session.rollback();
+
+      producer.send(message);
+      session.commit();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+      MapMessage messageRec = (MapMessage) consumer.receive(5000);
+      Assert.assertNotNull(messageRec);
+
+      for (int i = 0; i < 10; i++) {
+         Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
+      }
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "InterruptReceive",
+         targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback",
+         targetMethod = "sendLargeMessageContinuation",
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkReceived();")})
+   public void testReceiveLargeMessage() throws Exception {
+
+      MapMessage message = createLargeMessage();
+
+      producer.send(message);
+      session.commit();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+      MapMessage messageRec = null;
+
+      try {
+         consumer.receive(5000);
+         Assert.fail("Expected a failure here");
+      }
+      catch (JMSException expected) {
+      }
+
+      session.rollback();
+
+      messageRec = (MapMessage) consumer.receive(5000);
+      Assert.assertNotNull(messageRec);
+      session.commit();
+
+      for (int i = 0; i < 10; i++) {
+         Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
+      }
+   }
+
+   public static void messageChunkReceived() {
+      messageChunkCount++;
+
+      if (messageChunkCount == 1000) {
+         final CountDownLatch latch = new CountDownLatch(1);
+         new Thread() {
+            public void run() {
+               try {
+                  latch.countDown();
+                  liveServer.stop();
+               }
+               catch (Exception e) {
+                  e.printStackTrace();
+               }
+            }
+         }.start();
+         try {
+            // just to make sure it's about to be stopped
+            // avoiding bootstrapping the thread as a delay
+            latch.await(1, TimeUnit.MINUTES);
+         }
+         catch (Throwable ignored ) {
+         }
+      }
+   }
+
+   public static void messageChunkSent() {
+      messageChunkCount++;
+
+      try {
+         if (messageChunkCount == 10) {
+            liveServer.stop(true);
+
+            System.err.println("activating");
+            if (!backupServer.waitForActivation(1, TimeUnit.MINUTES)) {
+               Logger.get(LargeMessageOverReplicationTest.class).warn("Can't failover server");
+            }
+         }
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+      }
+   }
+
+   private MapMessage createLargeMessage() throws JMSException {
+      MapMessage message = session.createMapMessage();
+
+      for (int i = 0; i < 10; i++) {
+         message.setBytes("test" + i, new byte[1024 * 1024]);
+      }
+      return message;
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index 7e04b91..c379fcd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -251,6 +251,21 @@ public class BackupSyncDelay implements Interceptor {
       }
 
       @Override
+      public int getReconnectID() {
+         return 0;
+      }
+
+      @Override
+      public boolean send(Packet packet, int reconnectID) {
+         return false;
+      }
+
+      @Override
+      public Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException {
+         return null;
+      }
+
+      @Override
       public void replayCommands(int lastConfirmedCommandID) {
          throw new UnsupportedOperationException();
       }


[2/2] activemq-artemis git commit: This closes #419

Posted by jb...@apache.org.
This closes #419


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4ba11c8b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4ba11c8b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4ba11c8b

Branch: refs/heads/master
Commit: 4ba11c8bbc13cfb8e8da58b27326b0d264600caf
Parents: 212c168 26fe21b
Author: jbertram <jb...@apache.org>
Authored: Fri Mar 11 13:25:50 2016 -0600
Committer: jbertram <jb...@apache.org>
Committed: Fri Mar 11 13:25:50 2016 -0600

----------------------------------------------------------------------
 .../api/core/ActiveMQInterruptedException.java  |   4 +
 .../client/ActiveMQClientMessageBundle.java     |   4 +
 .../impl/ClientProducerCreditManagerImpl.java   |   2 +-
 .../core/client/impl/ClientProducerCredits.java |   2 +-
 .../client/impl/ClientProducerCreditsImpl.java  |   7 +-
 .../core/client/impl/ClientProducerImpl.java    |  59 ++---
 .../client/impl/LargeMessageControllerImpl.java |  15 +-
 .../artemis/core/protocol/core/Channel.java     |  27 ++
 .../core/impl/ActiveMQSessionContext.java       |   9 +-
 .../core/protocol/core/impl/ChannelImpl.java    |  38 ++-
 .../spi/core/remoting/SessionContext.java       |   3 +
 .../artemis/jms/client/ActiveMQConnection.java  |   4 +
 .../jms/client/ActiveMQMessageProducer.java     |   6 +
 .../LargeMessageOverReplicationTest.java        | 264 +++++++++++++++++++
 .../cluster/util/BackupSyncDelay.java           |  15 ++
 15 files changed, 406 insertions(+), 53 deletions(-)
----------------------------------------------------------------------