You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/06/06 21:41:29 UTC
[1/2] activemq-artemis git commit: ARTEMIS-552 Replication target
being finished can lead to instability on live
Repository: activemq-artemis
Updated Branches:
refs/heads/master ee6176c67 -> 50d83fb63
ARTEMIS-552 Replication target being finished can lead to instability on live
https://issues.apache.org/jira/browse/ARTEMIS-552
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e658654
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e658654
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e658654
Branch: refs/heads/master
Commit: 2e6586548b3a0d69f1ce2079a2da243af16a28c6
Parents: ee6176c
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jun 1 15:34:57 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jun 6 16:28:51 2016 -0400
----------------------------------------------------------------------
.../core/persistence/OperationContext.java | 12 +++
.../core/persistence/StorageManager.java | 2 +
.../journal/AbstractJournalStorageManager.java | 9 +++
.../impl/journal/DummyOperationContext.java | 7 ++
.../impl/journal/JournalStorageManager.java | 4 +
.../impl/journal/OperationContextImpl.java | 46 ++++++++++--
.../impl/nullpm/NullStorageManager.java | 10 +++
.../postoffice/impl/DuplicateIDCacheImpl.java | 2 +-
.../server/impl/RemotingServiceImpl.java | 12 ++-
.../core/replication/ReplicationManager.java | 72 +++++++++++++-----
.../core/server/ActiveMQServerLogger.java | 5 ++
.../artemis/core/transaction/Transaction.java | 5 ++
.../core/transaction/impl/TransactionImpl.java | 78 ++++++++++++++++++--
.../transaction/impl/TransactionImplTest.java | 5 ++
.../extras/byteman/OrphanedConsumerTest.java | 4 -
.../tests/integration/client/PagingTest.java | 5 ++
.../persistence/DuplicateCacheTest.java | 2 +-
.../core/postoffice/impl/BindingsImplTest.java | 5 ++
18 files changed, 246 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
index 6d64eb8..e893a10 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
@@ -29,9 +29,21 @@ public interface OperationContext extends IOCompletion {
/**
* Execute the task when all IO operations are complete,
* Or execute it immediately if nothing is pending.
+ * @param runnable the tas to be executed.
+ * @param storeOnly There are tasks that won't need to wait on replication or paging and will need to
+ * be completed as soon as the response from the journal is received. An example would be the
+ * DuplicateCache
+ */
+ void executeOnCompletion(IOCallback runnable, boolean storeOnly);
+
+ /**
+ * Execute the task when all IO operations are complete,
+ * Or execute it immediately if nothing is pending.
+ * @param runnable the tas to be executed.
*/
void executeOnCompletion(IOCallback runnable);
+
void replicationLineUp();
void replicationDone();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index a0a5200..f92d0d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -100,6 +100,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void afterCompleteOperations(IOCallback run);
+ /** This is similar to afterComplete, however this only cares about the journal part. */
+ void afterStoreOperations(IOCallback run);
/**
* Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may lock up
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index ff21fe2..ed2e1f4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -292,6 +292,10 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
getContext().executeOnCompletion(run);
}
+ public void afterStoreOperations(IOCallback run) {
+ getContext().executeOnCompletion(run, true);
+ }
+
@Override
public long generateID() {
return idGenerator.generateID();
@@ -1789,6 +1793,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
}
@Override
+ public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ executeOnCompletion(runnable);
+ }
+
+ @Override
public void replicationDone() {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
index 1ae7524..6fd95ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
@@ -35,6 +35,13 @@ final class DummyOperationContext implements OperationContext {
}
@Override
+ public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ // There are no executeOnCompletion calls while using the DummyOperationContext
+ // However we keep the code here for correctness
+ runnable.done();
+ }
+
+ @Override
public void replicationDone() {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 1379308..157306e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -61,8 +61,10 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.jboss.logging.Logger;
public class JournalStorageManager extends AbstractJournalStorageManager {
+ private static final Logger logger = Logger.getLogger(JournalStorageManager.class);
private SequentialFileFactory journalFF;
@@ -569,6 +571,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
}
catch (Exception e) {
+ logger.warn(e.getMessage(), e);
stopReplication();
throw e;
}
@@ -681,6 +684,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
*/
@Override
public void stopReplication() {
+ logger.trace("stopReplication()");
storageManagerLock.writeLock().lock();
try {
if (replicator == null)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index acd75b1..06e07f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -72,6 +72,7 @@ public class OperationContextImpl implements OperationContext {
}
private List<TaskHolder> tasks;
+ private List<TaskHolder> storeOnlyTasks;
private long minimalStore = Long.MAX_VALUE;
private long minimalReplicated = Long.MAX_VALUE;
@@ -126,7 +127,12 @@ public class OperationContextImpl implements OperationContext {
}
@Override
- public void executeOnCompletion(final IOCallback completion) {
+ public void executeOnCompletion(IOCallback runnable) {
+ executeOnCompletion(runnable, false);
+ }
+
+ @Override
+ public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) {
if (errorCode != -1) {
completion.onError(errorCode, errorMessage);
return;
@@ -135,11 +141,18 @@ public class OperationContextImpl implements OperationContext {
boolean executeNow = false;
synchronized (this) {
- if (tasks == null) {
- tasks = new LinkedList<>();
- minimalReplicated = replicationLineUp.intValue();
- minimalStore = storeLineUp.intValue();
- minimalPage = pageLineUp.intValue();
+ if (storeOnly) {
+ if (storeOnlyTasks == null) {
+ storeOnlyTasks = new LinkedList<>();
+ }
+ }
+ else {
+ if (tasks == null) {
+ tasks = new LinkedList<>();
+ minimalReplicated = replicationLineUp.intValue();
+ minimalStore = storeLineUp.intValue();
+ minimalPage = pageLineUp.intValue();
+ }
}
// On this case, we can just execute the context directly
@@ -159,7 +172,12 @@ public class OperationContextImpl implements OperationContext {
}
}
else {
- tasks.add(new TaskHolder(completion));
+ if (storeOnly) {
+ storeOnlyTasks.add(new TaskHolder(completion));
+ }
+ else {
+ tasks.add(new TaskHolder(completion));
+ }
}
}
@@ -177,6 +195,20 @@ public class OperationContextImpl implements OperationContext {
}
private void checkTasks() {
+
+ if (storeOnlyTasks != null) {
+ Iterator<TaskHolder> iter = storeOnlyTasks.iterator();
+ while (iter.hasNext()) {
+ TaskHolder holder = iter.next();
+ if (stored >= holder.storeLined) {
+ // If set, we use an executor to avoid the server being single threaded
+ execute(holder.task);
+
+ iter.remove();
+ }
+ }
+ }
+
if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) {
Iterator<TaskHolder> iter = tasks.iterator();
while (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 39c5de5..21a9fd9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -94,6 +94,11 @@ public class NullStorageManager implements StorageManager {
}
@Override
+ public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ runnable.done();
+ }
+
+ @Override
public void storeLineUp() {
}
@@ -339,6 +344,11 @@ public class NullStorageManager implements StorageManager {
}
@Override
+ public void afterStoreOperations(IOCallback run) {
+ run.done();
+ }
+
+ @Override
public void waitOnOperations() throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
index 7f35638..28896c3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -226,7 +226,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
}
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
- tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
+ tx.afterStore(new AddDuplicateIDOperation(duplID, recordID));
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 3672fe2..3a073e9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -690,9 +690,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
for (Object id : idsToRemove) {
- RemotingConnection conn = getConnection(id);
+ final RemotingConnection conn = getConnection(id);
if (conn != null) {
- conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
+ // In certain cases (replicationManager for instance) calling fail could take some time
+ // We can't pause the FailureCheckAndFlushThread as that would lead other clients to fail for
+ // missing pings
+ flushExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
+ }
+ });
removeConnection(id);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 1abd9c6..58102d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -265,13 +265,14 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
}
@Override
- public synchronized void stop() throws Exception {
- if (!started) {
- return;
+ public void stop() throws Exception {
+ synchronized (this) {
+ if (!started) {
+ logger.trace("Stopping being ignored as it hasn't been started");
+ return;
+ }
}
- enabled = false;
-
// This is to avoid the write holding a lock while we are trying to close it
if (replicatingChannel != null) {
replicatingChannel.close();
@@ -279,6 +280,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
}
synchronized (replicationLock) {
+ enabled = false;
writable.set(true);
replicationLock.notifyAll();
clearReplicationTokens();
@@ -299,9 +301,12 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
* backup crashing).
*/
public void clearReplicationTokens() {
+ logger.trace("clearReplicationTokens initiating");
synchronized (replicationLock) {
+ logger.trace("clearReplicationTokens entered the lock");
while (!pendingTokens.isEmpty()) {
OperationContext ctx = pendingTokens.poll();
+ logger.trace("Calling ctx.replicationDone()");
try {
ctx.replicationDone();
}
@@ -310,6 +315,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
}
}
}
+ logger.trace("clearReplicationTokens finished");
}
/**
@@ -347,20 +353,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
synchronized (replicationLock) {
if (enabled) {
pendingTokens.add(repliToken);
- if (!replicatingChannel.getConnection().isWritable(this)) {
- try {
- writable.set(false);
- //don't wait for ever as this may hang tests etc, we've probably been closed anyway
- long now = System.currentTimeMillis();
- long deadline = now + 5000;
- while (!writable.get() && now < deadline) {
- replicationLock.wait(deadline - now);
- now = System.currentTimeMillis();
- }
- }
- catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ if (!flowControl()) {
+ return repliToken;
}
replicatingChannel.send(packet);
}
@@ -379,6 +373,43 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
return repliToken;
}
+ /** This was written as a refactoring of sendReplicatePacket.
+ * In case you refactor this in any way, this method must hold a lock on replication lock. .*/
+ private boolean flowControl() {
+ // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
+ // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
+ if (!replicatingChannel.getConnection().isWritable(this)) {
+ try {
+ logger.trace("flowControl waiting on writable");
+ writable.set(false);
+ //don't wait for ever as this may hang tests etc, we've probably been closed anyway
+ long now = System.currentTimeMillis();
+ long deadline = now + 5000;
+ while (!writable.get() && now < deadline) {
+ replicationLock.wait(deadline - now);
+ now = System.currentTimeMillis();
+ }
+ logger.trace("flow control done");
+
+ if (!writable.get()) {
+ ActiveMQServerLogger.LOGGER.slowReplicationResponse();
+ logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
+ try {
+ stop();
+ }
+ catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ return false;
+ }
+ }
+ catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
+ }
+ }
+ return true;
+ }
+
@Override
public void readyForWriting() {
synchronized (replicationLock) {
@@ -591,6 +622,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
try {
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
+ logger.trace("sendSynchronizationDone wasn't finished in time");
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
}
}
@@ -598,6 +630,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
logger.debug(e);
}
inSync = false;
+
+ logger.trace("sendSynchronizationDone finished");
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6679008..3428a2f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1209,6 +1209,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222206, value = "Connection limit of {0} reached. Refusing connection from {1}.", format = Message.Format.MESSAGE_FORMAT)
void connectionLimitReached(long connectionsAllowed, String address);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222207, value = "The backup server is not responding promptly introducing latency beyond the limit. Replication server being disconnected now.",
+ format = Message.Format.MESSAGE_FORMAT)
+ void slowReplicationResponse();
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index da87cbf..33c1eea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -67,6 +67,11 @@ public interface Transaction {
void addOperation(TransactionOperation sync);
+ /** This is an operation that will be called right after the storage is completed.
+ * addOperation could only happen after paging and replication, while these operations will just be
+ * about the storage*/
+ void afterStore(TransactionOperation sync);
+
List<TransactionOperation> getAllOperations();
boolean hasTimedOut(long currentTime, int defaultTimeout);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 0a91562..185bfb2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.transaction.impl;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Date;
+import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -38,6 +39,8 @@ public class TransactionImpl implements Transaction {
private List<TransactionOperation> operations;
+ private List<TransactionOperation> storeOperations;
+
private static final int INITIAL_NUM_PROPERTIES = 10;
private Object[] properties = new Object[TransactionImpl.INITIAL_NUM_PROPERTIES];
@@ -301,6 +304,24 @@ public class TransactionImpl implements Transaction {
}
});
+ final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
+ this.storeOperations = null;
+
+ if (storeOperationsToComplete != null) {
+ storageManager.afterStoreOperations(new IOCallback() {
+
+ @Override
+ public void onError(final int errorCode, final String errorMessage) {
+ ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
+ }
+
+ @Override
+ public void done() {
+ afterCommit(storeOperationsToComplete);
+ }
+ });
+ }
+
}
}
@@ -365,6 +386,9 @@ public class TransactionImpl implements Transaction {
final List<TransactionOperation> operationsToComplete = this.operations;
this.operations = null;
+ final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
+ this.storeOperations = null;
+
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order
@@ -380,6 +404,21 @@ public class TransactionImpl implements Transaction {
afterRollback(operationsToComplete);
}
});
+
+ if (storeOperationsToComplete != null) {
+ storageManager.afterStoreOperations(new IOCallback() {
+
+ @Override
+ public void onError(final int errorCode, final String errorMessage) {
+ ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
+ }
+
+ @Override
+ public void done() {
+ afterRollback(storeOperationsToComplete);
+ }
+ });
+ }
}
@Override
@@ -445,6 +484,15 @@ public class TransactionImpl implements Transaction {
operations.add(operation);
}
+
+ @Override
+ public synchronized void afterStore(TransactionOperation sync) {
+ if (storeOperations == null) {
+ storeOperations = new LinkedList<>();
+ }
+ storeOperations.add(sync);
+ }
+
private int getOperationsCount() {
checkCreateOperations();
@@ -491,7 +539,7 @@ public class TransactionImpl implements Transaction {
private void checkCreateOperations() {
if (operations == null) {
- operations = new ArrayList<>();
+ operations = new LinkedList<>();
}
}
@@ -505,13 +553,13 @@ public class TransactionImpl implements Transaction {
}
}
- private synchronized void afterRollback(List<TransactionOperation> oeprationsToComplete) {
- if (oeprationsToComplete != null) {
- for (TransactionOperation operation : oeprationsToComplete) {
+ private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
+ if (operationsToComplete != null) {
+ for (TransactionOperation operation : operationsToComplete) {
operation.afterRollback(this);
}
// Help out GC here
- oeprationsToComplete.clear();
+ operationsToComplete.clear();
}
}
@@ -521,6 +569,11 @@ public class TransactionImpl implements Transaction {
operation.beforeCommit(this);
}
}
+ if (storeOperations != null) {
+ for (TransactionOperation operation : storeOperations) {
+ operation.beforeCommit(this);
+ }
+ }
}
private synchronized void beforePrepare() throws Exception {
@@ -529,6 +582,11 @@ public class TransactionImpl implements Transaction {
operation.beforePrepare(this);
}
}
+ if (storeOperations != null) {
+ for (TransactionOperation operation : storeOperations) {
+ operation.beforePrepare(this);
+ }
+ }
}
private synchronized void beforeRollback() throws Exception {
@@ -537,6 +595,11 @@ public class TransactionImpl implements Transaction {
operation.beforeRollback(this);
}
}
+ if (storeOperations != null) {
+ for (TransactionOperation operation : storeOperations) {
+ operation.beforeRollback(this);
+ }
+ }
}
private synchronized void afterPrepare() {
@@ -545,6 +608,11 @@ public class TransactionImpl implements Transaction {
operation.afterPrepare(this);
}
}
+ if (storeOperations != null) {
+ for (TransactionOperation operation : storeOperations) {
+ operation.afterPrepare(this);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 6c5cfe5..9a66610 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -253,6 +253,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public void afterStoreOperations(IOCallback run) {
+ run.done();
+ }
+
+ @Override
public boolean waitOnOperations(long timeout) throws Exception {
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
index 211aee5..a95cbaa 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
@@ -67,10 +67,6 @@ public class OrphanedConsumerTest extends ActiveMQTestBase {
* This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()}
*/
public static void leavingCloseOnTestCountersWhileClosing() {
- if (staticServer.getConnectionCount() == 0) {
- verification = new AssertionError("The connection was closed before the consumers and sessions, this may cause issues on management leaving Orphaned Consumers!");
- }
-
if (staticServer.getSessions().size() == 0) {
verification = new AssertionError("The session was closed before the consumers, this may cause issues on management leaving Orphaned Consumers!");
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
index f658fae..41467b5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
@@ -5730,7 +5730,12 @@ public class PagingTest extends ActiveMQTestBase {
@Override
public void executeOnCompletion(IOCallback runnable) {
+ runnable.done();
+ }
+ @Override
+ public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ runnable.done();
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
index 38be202..299022c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
@@ -90,7 +90,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
public void onError(int errorCode, String errorMessage) {
}
- });
+ }, true);
Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 805a6f5..44b5d82 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -115,6 +115,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
@Override
+ public void afterStore(TransactionOperation sync) {
+
+ }
+
+ @Override
public void addOperation(final TransactionOperation sync) {
}
[2/2] activemq-artemis git commit: This closes #563
Posted by jb...@apache.org.
This closes #563
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/50d83fb6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/50d83fb6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/50d83fb6
Branch: refs/heads/master
Commit: 50d83fb63d795b78af08e43c336f7638a756e2ae
Parents: ee6176c 2e65865
Author: jbertram <jb...@apache.org>
Authored: Mon Jun 6 16:41:13 2016 -0500
Committer: jbertram <jb...@apache.org>
Committed: Mon Jun 6 16:41:13 2016 -0500
----------------------------------------------------------------------
.../core/persistence/OperationContext.java | 12 +++
.../core/persistence/StorageManager.java | 2 +
.../journal/AbstractJournalStorageManager.java | 9 +++
.../impl/journal/DummyOperationContext.java | 7 ++
.../impl/journal/JournalStorageManager.java | 4 +
.../impl/journal/OperationContextImpl.java | 46 ++++++++++--
.../impl/nullpm/NullStorageManager.java | 10 +++
.../postoffice/impl/DuplicateIDCacheImpl.java | 2 +-
.../server/impl/RemotingServiceImpl.java | 12 ++-
.../core/replication/ReplicationManager.java | 72 +++++++++++++-----
.../core/server/ActiveMQServerLogger.java | 5 ++
.../artemis/core/transaction/Transaction.java | 5 ++
.../core/transaction/impl/TransactionImpl.java | 78 ++++++++++++++++++--
.../transaction/impl/TransactionImplTest.java | 5 ++
.../extras/byteman/OrphanedConsumerTest.java | 4 -
.../tests/integration/client/PagingTest.java | 5 ++
.../persistence/DuplicateCacheTest.java | 2 +-
.../core/postoffice/impl/BindingsImplTest.java | 5 ++
18 files changed, 246 insertions(+), 39 deletions(-)
----------------------------------------------------------------------