You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/01/05 15:15:10 UTC
[2/4] activemq-artemis git commit: ARTEMIS-332 - Duplicate delivery
over Bridges under OME scenarios, paging and other failures
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index b0a5a7e..ae93a97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -415,7 +415,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
backupActivationThread.start();
}
else {
- ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "" );
+ ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "");
}
// start connector service
connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry);
@@ -508,18 +508,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* Stops the server in a different thread.
*/
public final void stopTheServer(final boolean criticalIOError) {
- ExecutorService executor = Executors.newSingleThreadExecutor();
- executor.submit(new Runnable() {
+ Thread thread = new Thread() {
@Override
public void run() {
try {
- stop(false, criticalIOError, false);
+ ActiveMQServerImpl.this.stop(false, criticalIOError, false);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
}
}
- });
+ };
+
+ thread.start();
}
@Override
@@ -722,7 +723,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
-
pagingManager = null;
securityStore = null;
resourceManager = null;
@@ -1016,7 +1016,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (securityStore != null) {
X509Certificate[] certificates = null;
if (connection.getTransportConnection() instanceof NettyConnection) {
- certificates = CertificateUtil.getCertsFromChannel(((NettyConnection)connection.getTransportConnection()).getChannel());
+ certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel());
}
securityStore.authenticate(username, password, certificates);
}
@@ -1428,7 +1428,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.bindingNotDivert(name);
}
- postOffice.removeBinding(name, null);
+ postOffice.removeBinding(name, null, true);
}
@Override
@@ -1954,11 +1954,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean failedAlready = false;
@Override
- public synchronized void onIOException(Exception cause, String message, SequentialFile file) {
+ public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
if (!failedAlready) {
failedAlready = true;
- ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
+ if (file == null) {
+ ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
+ }
+ else {
+ ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
+ }
stopTheServer(true);
}
@@ -2021,10 +2026,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* move any older data away and log a warning about it.
*/
void moveServerData() {
- File[] dataDirs = new File[]{configuration.getBindingsLocation(),
- configuration.getJournalLocation(),
- configuration.getPagingLocation(),
- configuration.getLargeMessagesLocation()};
+ File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()};
boolean allEmpty = true;
int lowestSuffixForMovedData = 1;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 5420688..c6d5aee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
@@ -66,7 +67,15 @@ public class LastValueQueue extends QueueImpl {
@Override
public synchronized void addTail(final MessageReference ref, final boolean direct) {
- SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+ SimpleString prop;
+
+ try {
+ prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ throw new IllegalStateException(e);
+ }
if (prop != null) {
HolderReference hr = map.get(prop);
@@ -103,45 +112,59 @@ public class LastValueQueue extends QueueImpl {
@Override
public synchronized void addHead(final MessageReference ref) {
- SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+ try {
+ SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
- if (prop != null) {
- HolderReference hr = map.get(prop);
+ if (prop != null) {
+ HolderReference hr = map.get(prop);
- if (hr != null) {
- // We keep the current ref and ack the one we are returning
+ if (hr != null) {
+ // We keep the current ref and ack the one we are returning
- super.referenceHandled();
+ super.referenceHandled();
- try {
- super.acknowledge(ref);
+ try {
+ super.acknowledge(ref);
+ }
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+ }
}
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+ else {
+ map.put(prop, (HolderReference) ref);
+
+ super.addHead(ref);
}
}
else {
- map.put(prop, (HolderReference) ref);
-
super.addHead(ref);
}
}
- else {
- super.addHead(ref);
+ catch (ActiveMQException e) {
+ criticalError(e);
+ throw new IllegalStateException(e);
}
}
@Override
protected void refRemoved(MessageReference ref) {
- synchronized (this) {
- SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+ try {
- if (prop != null) {
- map.remove(prop);
+ synchronized (this) {
+ SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+
+ if (prop != null) {
+ map.remove(prop);
+ }
}
+
+ super.refRemoved(ref);
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ throw new IllegalStateException(e);
}
- super.refRemoved(ref);
}
private class HolderReference implements MessageReference {
@@ -200,7 +223,13 @@ public class LastValueQueue extends QueueImpl {
@Override
public ServerMessage getMessage() {
- return ref.getMessage();
+ try {
+ return ref.getMessage();
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ throw new IllegalStateException(e);
+ }
}
@Override
@@ -256,7 +285,13 @@ public class LastValueQueue extends QueueImpl {
*/
@Override
public int getMessageMemoryEstimate() {
- return ref.getMessage().getMemoryEstimate();
+ try {
+ return ref.getMessage().getMemoryEstimate();
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ throw new IllegalStateException(e);
+ }
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 96413f7..fd04b6d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -208,7 +208,7 @@ public class MessageReferenceImpl implements MessageReference {
}
if (other instanceof MessageReferenceImpl) {
- MessageReference reference = (MessageReferenceImpl) other;
+ MessageReferenceImpl reference = (MessageReferenceImpl) other;
if (this.getMessage().equals(reference.getMessage()))
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 18eb0b8..c963e4d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -893,7 +894,7 @@ public class QueueImpl implements Queue {
}
@Override
- public synchronized MessageReference getReference(final long id1) {
+ public synchronized MessageReference getReference(final long id1) throws ActiveMQException {
LinkedListIterator<MessageReference> iterator = iterator();
try {
@@ -1053,7 +1054,13 @@ public class QueueImpl implements Queue {
@Override
public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) {
- getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+ try {
+ getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ getPageSubscription().getPagingStore().criticalException(e);
+ }
}
@Override
@@ -1102,7 +1109,7 @@ public class QueueImpl implements Queue {
}
@Override
- public void deliverScheduledMessages() {
+ public void deliverScheduledMessages() throws ActiveMQException {
List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
if (scheduledMessages != null && scheduledMessages.size() > 0) {
for (MessageReference ref : scheduledMessages) {
@@ -1311,7 +1318,7 @@ public class QueueImpl implements Queue {
Transaction tx = new BindingsTransactionImpl(storageManager);
try {
- postOffice.removeBinding(name, tx);
+ postOffice.removeBinding(name, tx, true);
deleteAllReferences();
@@ -1770,7 +1777,12 @@ public class QueueImpl implements Queue {
private synchronized void internalAddTail(final MessageReference ref) {
refAdded(ref);
- messageReferences.addTail(ref, ref.getMessage().getPriority());
+ try {
+ messageReferences.addTail(ref, ref.getMessage().getPriority());
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ }
}
/**
@@ -1781,9 +1793,18 @@ public class QueueImpl implements Queue {
* @param ref
*/
private void internalAddHead(final MessageReference ref) {
- queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
- refAdded(ref);
- messageReferences.addHead(ref, ref.getMessage().getPriority());
+ try {
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+ refAdded(ref);
+ messageReferences.addHead(ref, ref.getMessage().getPriority());
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ }
+ }
+
+ void criticalError(ActiveMQException e) {
+ storageManager.criticalError(e);
}
private synchronized void doInternalPoll() {
@@ -2011,14 +2032,17 @@ public class QueueImpl implements Queue {
return null;
}
else {
- // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
- return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ try {
+ // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
+ return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ }
+ catch (ActiveMQException e) {
+ criticalError(e);
+ throw new IllegalStateException(e);
+ }
}
}
- /**
- * @param ref
- */
protected void refRemoved(MessageReference ref) {
queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
if (ref.isPaged()) {
@@ -2026,9 +2050,6 @@ public class QueueImpl implements Queue {
}
}
- /**
- * @param ref
- */
protected void refAdded(final MessageReference ref) {
if (ref.isPaged()) {
pagedReferences.incrementAndGet();
@@ -2502,23 +2523,29 @@ public class QueueImpl implements Queue {
}
private boolean checkExpired(final MessageReference reference) {
- if (reference.getMessage().isExpired()) {
- if (isTrace) {
- ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired");
- }
- reference.handled();
+ try {
+ if (reference.getMessage().isExpired()) {
+ if (isTrace) {
+ ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired");
+ }
+ reference.handled();
- try {
- expire(reference);
+ try {
+ expire(reference);
+ }
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorExpiringRef(e);
+ }
+
+ return true;
}
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorExpiringRef(e);
+ else {
+ return false;
}
-
- return true;
}
- else {
- return false;
+ catch (ActiveMQException e) {
+ criticalError(e);
+ throw new IllegalStateException(e);
}
}
@@ -2557,7 +2584,7 @@ public class QueueImpl implements Queue {
}
@Override
- public void postAcknowledge(final MessageReference ref) {
+ public void postAcknowledge(final MessageReference ref) throws ActiveMQException {
QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index d117186..92d1a61 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -25,12 +32,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
public class RefsOperation extends TransactionOperationAbstract {
private final StorageManager storageManager;
@@ -55,7 +56,7 @@ public class RefsOperation extends TransactionOperationAbstract {
ignoreRedeliveryCheck = true;
}
- synchronized void addAck(final MessageReference ref) {
+ synchronized void addAck(final MessageReference ref) throws ActiveMQException {
refsToAck.add(ref);
if (ref.isPaged()) {
if (pagedMessagesToPostACK == null) {
@@ -147,7 +148,17 @@ public class RefsOperation extends TransactionOperationAbstract {
public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) {
synchronized (ref.getQueue()) {
- queue.postAcknowledge(ref);
+ try {
+ queue.postAcknowledge(ref);
+ }
+ catch (ActiveMQException e) {
+ if (queue instanceof QueueImpl) {
+ ((QueueImpl) queue).criticalError(e);
+ }
+ else {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index f9ee1ce..6b5e2e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -97,7 +98,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
}
@Override
- public List<MessageReference> cancel(final Filter filter) {
+ public List<MessageReference> cancel(final Filter filter) throws ActiveMQException {
List<MessageReference> refs = new ArrayList<>();
synchronized (scheduledReferences) {
@@ -115,7 +116,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
}
@Override
- public MessageReference removeReferenceWithID(final long id) {
+ public MessageReference removeReferenceWithID(final long id) throws ActiveMQException {
synchronized (scheduledReferences) {
Iterator<RefScheduled> iter = scheduledReferences.iterator();
while (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 4a8b16a..7d54d31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -286,7 +286,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// should go back into the
// queue for delivery later.
// TCP-flow control has to be done first than everything else otherwise we may lose notifications
- if (!callback.isWritable(this) || !started || transferring ) {
+ if (!callback.isWritable(this) || !started || transferring) {
return HandleStatus.BUSY;
}
@@ -733,25 +733,63 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
- @Override
- public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception {
+ public void individualAcknowledge(Transaction tx,
+ final long messageID) throws Exception {
if (browseOnly) {
return;
}
- MessageReference ref = removeReferenceByID(messageID);
+ boolean startedTransaction = false;
- if (ref == null) {
- ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
- if (tx != null) {
- tx.markAsRollbackOnly(ils);
- }
- throw ils;
+ if (tx == null) {
+ startedTransaction = true;
+ tx = new TransactionImpl(storageManager);
}
- ackReference(tx, ref);
+ try {
+
+ MessageReference ref;
+ ref = removeReferenceByID(messageID);
+
+ if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
+ ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
+ }
+
+ if (ref == null) {
+ ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
+ if (tx != null) {
+ tx.markAsRollbackOnly(ils);
+ }
+ throw ils;
+ }
+
+ ackReference(tx, ref);
+
+ if (startedTransaction) {
+ tx.commit();
+ }
+ }
+ catch (ActiveMQException e) {
+ if (startedTransaction) {
+ tx.rollback();
+ }
+ else {
+ tx.markAsRollbackOnly(e);
+ }
+ throw e;
+ }
+ catch (Throwable e) {
+ ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
+ ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
+ if (startedTransaction) {
+ tx.rollback();
+ }
+ else {
+ tx.markAsRollbackOnly(hqex);
+ }
+ throw hqex;
+ }
- acks++;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index e21102c..ebe2f8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -316,6 +316,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
+ public void markTXFailed(Throwable e) {
+ Transaction currentTX = this.tx;
+ if (currentTX != null) {
+ if (e instanceof ActiveMQException) {
+ currentTX.markAsRollbackOnly((ActiveMQException) e);
+ }
+ else {
+ ActiveMQException exception = new ActiveMQException(e.getMessage());
+ exception.initCause(e);
+ currentTX.markAsRollbackOnly(exception);
+ }
+ }
+ }
+
+ @Override
public boolean removeConsumer(final long consumerID) throws Exception {
return consumers.remove(consumerID) != null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 49dcbe8..3f726f0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -229,7 +229,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
}
- private void validateSequence(ScheduledDeliveryHandlerImpl handler) {
+ private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception {
long lastSequence = -1;
for (MessageReference ref : handler.getScheduledReferences()) {
assertEquals(lastSequence + 1, ref.getMessage().getMessageID());
@@ -256,7 +256,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
handler.checkAndSchedule(refImpl, tail);
}
- private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) {
+ private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) throws Exception {
List<MessageReference> refs = handler.getScheduledReferences();
HashSet<Long> messages = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 3909c3c..6c5cfe5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -208,6 +208,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public void criticalError(Throwable error) {
+ error.printStackTrace();
+ }
+
+ @Override
public OperationContext newContext(Executor executor) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
index 05a48e9..5fe8953 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
@@ -16,10 +16,16 @@
*/
package org.apache.activemq.artemis.tests.integration;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -37,10 +43,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
public class DuplicateDetectionTest extends ActiveMQTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@@ -213,6 +215,75 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
Assert.assertEquals(0, ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().size());
}
+ // It is important to test the shrink with this rule
+ // because we could have this after crashes
+ // we would eventually have a higher number of caches while we couldn't have time to clear previous ones
+ @Test
+ public void testShrinkCache() throws Exception {
+ server.stop();
+ server.getConfiguration().setIDCacheSize(150);
+ server.start();
+
+ final int TEST_SIZE = 200;
+
+ ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ locator.setBlockOnNonDurableSend(true);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.start();
+
+ final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+ session.createQueue(queueName, queueName, null, true);
+
+ ClientProducer producer = session.createProducer(queueName);
+
+ for (int i = 0; i < TEST_SIZE; i++) {
+ ClientMessage message = session.createMessage(true);
+ message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i));
+ producer.send(message);
+ }
+ session.commit();
+
+ sf.close();
+ session.close();
+ locator.close();
+
+ server.stop();
+
+ server.getConfiguration().setIDCacheSize(100);
+
+ server.start();
+
+ locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ locator.setBlockOnNonDurableSend(true);
+ sf = createSessionFactory(locator);
+ session = sf.createSession(false, false, false);
+ session.start();
+
+ producer = session.createProducer(queueName);
+
+ // will send the last 50 again
+ for (int i = TEST_SIZE - 50; i < TEST_SIZE; i++) {
+ ClientMessage message = session.createMessage(true);
+ message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i));
+ producer.send(message);
+ }
+
+ try {
+ session.commit();
+ Assert.fail("Exception expected");
+ }
+ catch (ActiveMQException expected) {
+
+ }
+
+ }
+
@Test
public void testSimpleDuplicateDetectionWithString() throws Exception {
ClientSession session = sf.createSession(false, true, true);
@@ -1240,176 +1311,6 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
}
@Test
- public void testDuplicateCachePersistedRestartWithSmallerCache() throws Exception {
- server.stop();
-
- final int initialCacheSize = 10;
- final int subsequentCacheSize = 5;
-
- config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
-
- server = createServer(config);
-
- server.start();
-
- sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.start();
-
- final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
-
- session.createQueue(queueName, queueName, null, false);
-
- ClientProducer producer = session.createProducer(queueName);
-
- ClientConsumer consumer = session.createConsumer(queueName);
-
- for (int i = 0; i < initialCacheSize; i++) {
- ClientMessage message = createMessage(session, i);
- SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
- producer.send(message);
- ClientMessage message2 = consumer.receive(1000);
- Assert.assertEquals(i, message2.getObjectProperty(propKey));
- }
-
- session.close();
-
- sf.close();
-
- server.stop();
-
- waitForServerToStop(server);
-
- config.setIDCacheSize(subsequentCacheSize);
-
- server = createServer(config);
-
- server.start();
-
- sf = createSessionFactory(locator);
-
- session = sf.createSession(false, true, true);
-
- session.start();
-
- session.createQueue(queueName, queueName, null, false);
-
- producer = session.createProducer(queueName);
-
- consumer = session.createConsumer(queueName);
-
- for (int i = 0; i < initialCacheSize; i++) {
- ClientMessage message = createMessage(session, i);
- SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
- producer.send(message);
- if (i >= subsequentCacheSize) {
- // Message should get through
- ClientMessage message2 = consumer.receive(1000);
- Assert.assertEquals(i, message2.getObjectProperty(propKey));
- }
- else {
- ClientMessage message2 = consumer.receiveImmediate();
- Assert.assertNull(message2);
- }
- }
- }
-
- @Test
- public void testDuplicateCachePersistedRestartWithSmallerCacheEnsureDeleted() throws Exception {
- server.stop();
-
- final int initialCacheSize = 10;
- final int subsequentCacheSize = 5;
-
- config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
-
- server = createServer(config);
-
- server.start();
-
- sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.start();
-
- final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
-
- session.createQueue(queueName, queueName, null, false);
-
- ClientProducer producer = session.createProducer(queueName);
-
- ClientConsumer consumer = session.createConsumer(queueName);
-
- for (int i = 0; i < initialCacheSize; i++) {
- ClientMessage message = createMessage(session, i);
- SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
- producer.send(message);
- ClientMessage message2 = consumer.receive(1000);
- Assert.assertEquals(i, message2.getObjectProperty(propKey));
- }
-
- session.close();
-
- sf.close();
-
- server.stop();
-
- waitForServerToStop(server);
-
- config.setIDCacheSize(subsequentCacheSize);
-
- server = createServer(config);
-
- server.start();
-
- // Now stop and set back to original cache size and restart
-
- server.stop();
-
- waitForServerToStop(server);
-
- config.setIDCacheSize(initialCacheSize);
-
- server = createServer(config);
-
- server.start();
-
- sf = createSessionFactory(locator);
-
- session = sf.createSession(false, true, true);
-
- session.start();
-
- session.createQueue(queueName, queueName, null, false);
-
- producer = session.createProducer(queueName);
-
- consumer = session.createConsumer(queueName);
-
- for (int i = 0; i < initialCacheSize; i++) {
- ClientMessage message = createMessage(session, i);
- SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
- producer.send(message);
- if (i >= subsequentCacheSize) {
- // Message should get through
- ClientMessage message2 = consumer.receive(1000);
- Assert.assertEquals(i, message2.getObjectProperty(propKey));
- }
- else {
- ClientMessage message2 = consumer.receiveImmediate();
- Assert.assertNull(message2);
- }
- }
- }
-
- @Test
public void testNoPersist() throws Exception {
server.stop();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6351357..6244330 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -257,6 +257,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
+ public void criticalException(Throwable e) {
+ }
+
+ @Override
public int getNumberOfPages() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 59d2646..5f02cf9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -782,6 +782,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
static final class FakeStoreFactory implements PagingStoreFactory {
+ @Override
+ public void criticalException(Throwable e) {
+ }
+
final SequentialFileFactory factory;
public FakeStoreFactory() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 30e302e..c47041a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -106,7 +106,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
for (int i = 0; i < 100; i++) {
- cacheID.addToCache(RandomUtil.randomBytes(), null);
+ cacheID.addToCache(RandomUtil.randomBytes());
}
journal.stop();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index a7be2fa..81015e4 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -93,41 +93,47 @@ public class FakeConsumer implements Consumer {
@Override
public synchronized HandleStatus handle(final MessageReference reference) {
- if (statusToReturn == HandleStatus.BUSY) {
- return HandleStatus.BUSY;
- }
-
- if (filter != null) {
- if (filter.match(reference.getMessage())) {
- references.addLast(reference);
- reference.getQueue().referenceHandled();
- notify();
-
- return HandleStatus.HANDLED;
+ try {
+ if (statusToReturn == HandleStatus.BUSY) {
+ return HandleStatus.BUSY;
}
- else {
- return HandleStatus.NO_MATCH;
+
+ if (filter != null) {
+ if (filter.match(reference.getMessage())) {
+ references.addLast(reference);
+ reference.getQueue().referenceHandled();
+ notify();
+
+ return HandleStatus.HANDLED;
+ }
+ else {
+ return HandleStatus.NO_MATCH;
+ }
}
- }
- if (newStatus != null) {
- if (delayCountdown == 0) {
- statusToReturn = newStatus;
+ if (newStatus != null) {
+ if (delayCountdown == 0) {
+ statusToReturn = newStatus;
- newStatus = null;
+ newStatus = null;
+ }
+ else {
+ delayCountdown--;
+ }
}
- else {
- delayCountdown--;
+
+ if (statusToReturn == HandleStatus.HANDLED) {
+ reference.getQueue().referenceHandled();
+ references.addLast(reference);
+ notify();
}
- }
- if (statusToReturn == HandleStatus.HANDLED) {
- reference.getQueue().referenceHandled();
- references.addLast(reference);
- notify();
+ return statusToReturn;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw new IllegalStateException(e.getMessage(), e);
}
-
- return statusToReturn;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 27d9c33..4f8a007 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -113,8 +113,7 @@ public class FakePostOffice implements PostOffice {
}
@Override
- public Binding removeBinding(final SimpleString uniqueName, final Transaction tx) throws Exception {
-
+ public Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception {
return null;
}