You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/03/11 20:26:13 UTC
[1/2] activemq-artemis git commit: ARTEMIS-437 Large Message send
should be interrupted during failover
Repository: activemq-artemis
Updated Branches:
refs/heads/master 212c16867 -> 4ba11c8bb
ARTEMIS-437 Large Message send should be interrupted during failover
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/26fe21ba
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/26fe21ba
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/26fe21ba
Branch: refs/heads/master
Commit: 26fe21baa4fed54d369b7090732081d1546b9638
Parents: 212c168
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 11 13:09:07 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 11 13:13:32 2016 -0500
----------------------------------------------------------------------
.../api/core/ActiveMQInterruptedException.java | 4 +
.../client/ActiveMQClientMessageBundle.java | 4 +
.../impl/ClientProducerCreditManagerImpl.java | 2 +-
.../core/client/impl/ClientProducerCredits.java | 2 +-
.../client/impl/ClientProducerCreditsImpl.java | 7 +-
.../core/client/impl/ClientProducerImpl.java | 59 ++---
.../client/impl/LargeMessageControllerImpl.java | 15 +-
.../artemis/core/protocol/core/Channel.java | 27 ++
.../core/impl/ActiveMQSessionContext.java | 9 +-
.../core/protocol/core/impl/ChannelImpl.java | 38 ++-
.../spi/core/remoting/SessionContext.java | 3 +
.../artemis/jms/client/ActiveMQConnection.java | 4 +
.../jms/client/ActiveMQMessageProducer.java | 6 +
.../LargeMessageOverReplicationTest.java | 264 +++++++++++++++++++
.../cluster/util/BackupSyncDelay.java | 15 ++
15 files changed, 406 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
index 9a3ff7f..c8fd80b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java
@@ -26,4 +26,8 @@ public final class ActiveMQInterruptedException extends RuntimeException {
public ActiveMQInterruptedException(Throwable cause) {
super(cause);
}
+
+ public ActiveMQInterruptedException(String message) {
+ super(message);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
index c23e9bb..08f51ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQInterceptorRejectedPacketException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageException;
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
@@ -230,4 +231,7 @@ public interface ActiveMQClientMessageBundle {
@Message(id = 119061, value = "Cannot send a packet while channel is failing over.")
IllegalStateException cannotSendPacketDuringFailover();
+
+ @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
+ ActiveMQInterruptedException packetTransmissionInterrupted();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index 982ce29..32ada4f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -171,7 +171,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
@Override
- public void acquireCredits(int credits) throws InterruptedException {
+ public void acquireCredits(int credits) {
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index 443d7e5..a97df92 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -21,7 +21,7 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
public interface ClientProducerCredits {
- void acquireCredits(int credits) throws InterruptedException, ActiveMQException;
+ void acquireCredits(int credits) throws ActiveMQException;
void receiveCredits(int credits);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index f7cf98f..70fda67 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
@@ -75,7 +76,7 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
}
@Override
- public void acquireCredits(final int credits) throws InterruptedException, ActiveMQException {
+ public void acquireCredits(final int credits) throws ActiveMQException {
checkCredits(credits);
boolean tryAcquire;
@@ -94,6 +95,10 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
}
}
+ catch (InterruptedException interrupted) {
+ Thread.currentThread().interrupt();
+ throw new ActiveMQInterruptedException(interrupted);
+ }
finally {
this.blocked = false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index b963aac..b4aa196 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -23,14 +23,12 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.DeflaterReader;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
@@ -286,20 +284,15 @@ public class ClientProducerImpl implements ClientProducerInternal {
final boolean sendBlocking,
final ClientProducerCredits theCredits,
final SendAcknowledgementHandler handler) throws ActiveMQException {
- try {
- // This will block if credits are not available
+ // This will block if credits are not available
- // Note, that for a large message, the encode size only includes the properties + headers
- // Not the continuations, but this is ok since we are only interested in limiting the amount of
- // data in *memory* and continuations go straight to the disk
+ // Note, that for a large message, the encode size only includes the properties + headers
+ // Not the continuations, but this is ok since we are only interested in limiting the amount of
+ // data in *memory* and continuations go straight to the disk
- int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
+ int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
- theCredits.acquireCredits(creditSize);
- }
- catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ theCredits.acquireCredits(creditSize);
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
}
@@ -352,12 +345,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
// On the case of large messages we tried to send credits before but we would starve otherwise
// we may find a way to improve the logic and always acquire the credits before
// but that's the way it's been tested and been working ATM
- try {
- credits.acquireCredits(creditsUsed);
- }
- catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ credits.acquireCredits(creditsUsed);
}
/**
@@ -379,6 +367,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
final long bodySize = context.getLargeBodySize();
+ final int reconnectID = sessionContext.getReconnectID();
+
context.open();
try {
@@ -396,14 +386,9 @@ public class ClientProducerImpl implements ClientProducerInternal {
lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
- int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
+ int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, messageHandler);
- try {
- credits.acquireCredits(creditsUsed);
- }
- catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ credits.acquireCredits(creditsUsed);
}
}
finally {
@@ -457,6 +442,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
boolean headerSent = false;
+
+ int reconnectID = sessionContext.getReconnectID();
while (!lastPacket) {
byte[] buff = new byte[minLargeMessageSize];
@@ -485,8 +472,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
totalSize += pos;
- final SessionSendContinuationMessage chunk;
-
if (lastPacket) {
if (!session.isCompressLargeMessages()) {
messageSize.set(totalSize);
@@ -514,13 +499,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
- int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler);
- try {
- credits.acquireCredits(creditsSent);
- }
- catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+ credits.acquireCredits(creditsSent);
}
}
else {
@@ -529,13 +509,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
sendInitialLargeMessageHeader(msgI, credits);
}
- int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler);
- try {
- credits.acquireCredits(creditsSent);
- }
- catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
+ credits.acquireCredits(creditsSent);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index fb5b687..289181e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
@@ -336,7 +337,19 @@ public class LargeMessageControllerImpl implements LargeMessageController {
// once the exception is set, the controller is pretty much useless
if (handledException != null) {
if (handledException instanceof ActiveMQException) {
- throw (ActiveMQException) handledException;
+ ActiveMQException nestedException;
+
+ // This is just to be user friendly and give the user a proper exception trace,
+ // instead to just where it was canceled.
+ if (handledException instanceof ActiveMQLargeMessageInterruptedException) {
+ nestedException = new ActiveMQLargeMessageInterruptedException(handledException.getMessage());
+ }
+ else {
+ nestedException = new ActiveMQException(((ActiveMQException) handledException).getType(), handledException.getMessage());
+ }
+ nestedException.initCause(handledException);
+
+ throw nestedException;
}
else {
throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY, "Error on saving LargeMessageBufferImpl", handledException);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index fba1a1c..4c59174 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -40,6 +40,13 @@ public interface Channel {
long getID();
/**
+ * This number increases every time the channel reconnects succesfully.
+ * This is used to guarantee the integrity of the channel on sequential commands such as large messages.
+ * @return
+ */
+ int getReconnectID();
+
+ /**
* For protocol check
*/
boolean supports(byte packetID);
@@ -54,6 +61,15 @@ public interface Channel {
boolean send(Packet packet);
/**
+ * Sends a packet on this channel.
+ *
+ * @param packet the packet to send
+ * @return false if the packet was rejected by an outgoing interceptor; true if the send was
+ * successful
+ */
+ boolean send(Packet packet, final int reconnectID);
+
+ /**
* Sends a packet on this channel using batching algorithm if appropriate
*
* @param packet the packet to send
@@ -83,6 +99,17 @@ public interface Channel {
Packet sendBlocking(Packet packet, byte expectedPacket) throws ActiveMQException;
/**
+ * Sends a packet on this channel and then blocks until a response is received or a timeout
+ * occurs.
+ *
+ * @param packet the packet to send
+ * @param expectedPacket the packet being expected.
+ * @return the response
+ * @throws ActiveMQException if an error occurs during the send
+ */
+ Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException;
+
+ /**
* Sets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should
* forward received packets to.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index f723802..ff61c21 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -154,6 +154,10 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
+ public int getReconnectID() {
+ return sessionChannel.getReconnectID();
+ }
+
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
@Override
public void commandConfirmed(final Packet packet) {
@@ -413,16 +417,17 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
+ int reconnectID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
- sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+ sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
}
else {
- sessionChannel.send(chunkPacket);
+ sessionChannel.send(chunkPacket, reconnectID);
}
return chunkPacket.getPacketSize();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 4ef6104..a7cb659 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -83,6 +83,9 @@ public final class ChannelImpl implements Channel {
private volatile long id;
+ /** This is used in */
+ private final AtomicInteger reconnectID = new AtomicInteger(0);
+
private ChannelHandler handler;
private Packet response;
@@ -139,6 +142,10 @@ public final class ChannelImpl implements Channel {
this.interceptors = interceptors;
}
+ public int getReconnectID() {
+ return reconnectID.get();
+ }
+
@Override
public boolean supports(final byte packetType) {
int version = connection.getClientVersion();
@@ -202,17 +209,21 @@ public final class ChannelImpl implements Channel {
@Override
public boolean sendAndFlush(final Packet packet) {
- return send(packet, true, false);
+ return send(packet, -1, true, false);
}
@Override
public boolean send(final Packet packet) {
- return send(packet, false, false);
+ return send(packet, -1, false, false);
+ }
+
+ public boolean send(Packet packet, final int reconnectID) {
+ return send(packet, reconnectID, false, false);
}
@Override
public boolean sendBatched(final Packet packet) {
- return send(packet, false, true);
+ return send(packet, -1, false, true);
}
@Override
@@ -221,7 +232,7 @@ public final class ChannelImpl implements Channel {
}
// This must never called by more than one thread concurrently
- public boolean send(final Packet packet, final boolean flush, final boolean batch) {
+ private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
return false;
}
@@ -271,6 +282,8 @@ public final class ChannelImpl implements Channel {
ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
}
+ checkReconnectID(reconnectID);
+
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
connection.getTransportConnection().write(buffer, flush, batch);
@@ -279,13 +292,24 @@ public final class ChannelImpl implements Channel {
}
}
+ private void checkReconnectID(int reconnectID) {
+ if (reconnectID >= 0 && reconnectID != this.reconnectID.get()) {
+ throw ActiveMQClientMessageBundle.BUNDLE.packetTransmissionInterrupted();
+ }
+ }
+
+ @Override
+ public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException {
+ return sendBlocking(packet, -1, expectedPacket);
+ }
+
/**
* Due to networking issues or server issues the server may take longer to answer than expected.. the client may timeout the call throwing an exception
* and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception.
* The expectedPacket will be used to filter out undesirable packets that would belong to previous calls.
*/
@Override
- public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException {
+ public Packet sendBlocking(final Packet packet, final int reconnectID, byte expectedPacket) throws ActiveMQException {
String interceptionResult = invokeInterceptors(packet, interceptors, connection);
if (interceptionResult != null) {
@@ -335,6 +359,8 @@ public final class ChannelImpl implements Channel {
addResendPacket(packet);
}
+ checkReconnectID(reconnectID);
+
connection.getTransportConnection().write(buffer, false, false);
long toWait = connection.getBlockingCallTimeout();
@@ -492,6 +518,8 @@ public final class ChannelImpl implements Channel {
public void lock() {
lock.lock();
+ reconnectID.incrementAndGet();
+
failingOver = true;
lock.unlock();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index f766e48..774dbfe 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -61,6 +61,8 @@ public abstract class SessionContext {
public abstract void resetName(String name);
+ public abstract int getReconnectID();
+
/**
* it will eather reattach or reconnect, preferably reattaching it.
*
@@ -145,6 +147,7 @@ public abstract class SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
+ int reconnectID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
index 19e5b67..e8122d0 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
@@ -625,6 +625,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
}
}
+ public ClientSessionFactory getSessionFactory() {
+ return sessionFactory;
+ }
+
// Private --------------------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 9c6c497..0f33c04 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -35,6 +35,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -500,6 +501,11 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
clientProducer.send(address, coreMessage);
}
}
+ catch (ActiveMQInterruptedException e) {
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.initCause(e);
+ throw jmsException;
+ }
catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
new file mode 100644
index 0000000..5f4d2e4
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.qpid.transport.util.Logger;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
+
+ public static int messageChunkCount = 0;
+
+ private static final ReusableLatch ruleFired = new ReusableLatch(1);
+ private static ActiveMQServer backupServer;
+ private static ActiveMQServer liveServer;
+
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
+ ActiveMQConnection connection;
+ Session session;
+ Queue queue;
+ MessageProducer producer;
+
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ ruleFired.setCount(1);
+ messageChunkCount = 0;
+
+ TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
+ TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
+ TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
+ TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
+
+ Configuration backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true));
+
+ Configuration liveConfig = createDefaultInVMConfig();
+
+ ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
+
+ liveServer = createServer(liveConfig);
+ liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue"));
+ liveServer.start();
+
+ waitForServerToStart(liveServer);
+
+ backupServer = createServer(backupConfig);
+ backupServer.start();
+
+ waitForServerToStart(backupServer);
+
+
+ // Just to make sure the expression worked
+ Assert.assertEquals(10000, factory.getMinLargeMessageSize());
+ Assert.assertEquals(10000, factory.getProducerWindowSize());
+ Assert.assertEquals(100, factory.getRetryInterval());
+ Assert.assertEquals(-1, factory.getReconnectAttempts());
+ Assert.assertTrue(factory.isHA());
+
+ connection = (ActiveMQConnection) factory.createConnection();
+
+ waitForRemoteBackup(connection.getSessionFactory(), 30);
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ queue = session.createQueue("jms.queue.Queue");
+ producer = session.createProducer(queue);
+
+ }
+
+ @After
+ public void stopServers() throws Exception {
+ if (connection != null) {
+ try {
+ connection.close();
+ }
+ catch (Exception e) {
+ }
+ }
+ if (backupServer != null) {
+ backupServer.stop();
+ backupServer = null;
+ }
+
+ if (liveServer != null) {
+ liveServer.stop();
+ liveServer = null;
+ }
+
+ backupServer = liveServer = null;
+ }
+
+ /*
+ * simple test to induce a potential race condition where the server's acceptors are active, but the server's
+ * state != STARTED
+ */
+ @Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "InterruptSending",
+ targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext",
+ targetMethod = "sendLargeMessageChunk",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkSent();")})
+ public void testSendLargeMessage() throws Exception {
+
+ MapMessage message = createLargeMessage();
+
+ try {
+ producer.send(message);
+ Assert.fail("expected an exception");
+ // session.commit();
+ }
+ catch (JMSException expected) {
+ }
+
+ session.rollback();
+
+ producer.send(message);
+ session.commit();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MapMessage messageRec = (MapMessage) consumer.receive(5000);
+ Assert.assertNotNull(messageRec);
+
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
+ }
+ }
+
+ @Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "InterruptReceive",
+ targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback",
+ targetMethod = "sendLargeMessageContinuation",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkReceived();")})
+ public void testReceiveLargeMessage() throws Exception {
+
+ MapMessage message = createLargeMessage();
+
+ producer.send(message);
+ session.commit();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MapMessage messageRec = null;
+
+ try {
+ consumer.receive(5000);
+ Assert.fail("Expected a failure here");
+ }
+ catch (JMSException expected) {
+ }
+
+ session.rollback();
+
+ messageRec = (MapMessage) consumer.receive(5000);
+ Assert.assertNotNull(messageRec);
+ session.commit();
+
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
+ }
+ }
+
+ public static void messageChunkReceived() {
+ messageChunkCount++;
+
+ if (messageChunkCount == 1000) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ new Thread() {
+ public void run() {
+ try {
+ latch.countDown();
+ liveServer.stop();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ try {
+ // just to make sure it's about to be stopped
+ // avoiding bootstrapping the thread as a delay
+ latch.await(1, TimeUnit.MINUTES);
+ }
+ catch (Throwable ignored ) {
+ }
+ }
+ }
+
+ public static void messageChunkSent() {
+ messageChunkCount++;
+
+ try {
+ if (messageChunkCount == 10) {
+ liveServer.stop(true);
+
+ System.err.println("activating");
+ if (!backupServer.waitForActivation(1, TimeUnit.MINUTES)) {
+ Logger.get(LargeMessageOverReplicationTest.class).warn("Can't failover server");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private MapMessage createLargeMessage() throws JMSException {
+ MapMessage message = session.createMapMessage();
+
+ for (int i = 0; i < 10; i++) {
+ message.setBytes("test" + i, new byte[1024 * 1024]);
+ }
+ return message;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26fe21ba/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index 7e04b91..c379fcd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -251,6 +251,21 @@ public class BackupSyncDelay implements Interceptor {
}
@Override
+ public int getReconnectID() {
+ return 0;
+ }
+
+ @Override
+ public boolean send(Packet packet, int reconnectID) {
+ return false;
+ }
+
+ @Override
+ public Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException {
+ return null;
+ }
+
+ @Override
public void replayCommands(int lastConfirmedCommandID) {
throw new UnsupportedOperationException();
}
[2/2] activemq-artemis git commit: This closes #419
Posted by jb...@apache.org.
This closes #419
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4ba11c8b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4ba11c8b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4ba11c8b
Branch: refs/heads/master
Commit: 4ba11c8bbc13cfb8e8da58b27326b0d264600caf
Parents: 212c168 26fe21b
Author: jbertram <jb...@apache.org>
Authored: Fri Mar 11 13:25:50 2016 -0600
Committer: jbertram <jb...@apache.org>
Committed: Fri Mar 11 13:25:50 2016 -0600
----------------------------------------------------------------------
.../api/core/ActiveMQInterruptedException.java | 4 +
.../client/ActiveMQClientMessageBundle.java | 4 +
.../impl/ClientProducerCreditManagerImpl.java | 2 +-
.../core/client/impl/ClientProducerCredits.java | 2 +-
.../client/impl/ClientProducerCreditsImpl.java | 7 +-
.../core/client/impl/ClientProducerImpl.java | 59 ++---
.../client/impl/LargeMessageControllerImpl.java | 15 +-
.../artemis/core/protocol/core/Channel.java | 27 ++
.../core/impl/ActiveMQSessionContext.java | 9 +-
.../core/protocol/core/impl/ChannelImpl.java | 38 ++-
.../spi/core/remoting/SessionContext.java | 3 +
.../artemis/jms/client/ActiveMQConnection.java | 4 +
.../jms/client/ActiveMQMessageProducer.java | 6 +
.../LargeMessageOverReplicationTest.java | 264 +++++++++++++++++++
.../cluster/util/BackupSyncDelay.java | 15 ++
15 files changed, 406 insertions(+), 53 deletions(-)
----------------------------------------------------------------------