You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2018/08/21 13:29:06 UTC
[1/2] activemq-artemis git commit: ARTEMIS-2046 Fixing issues with
JournalStorageManager.stop in replication, JDBC and shared storage
Repository: activemq-artemis
Updated Branches:
refs/heads/master f1dfc7281 -> 281cff3d4
ARTEMIS-2046 Fixing issues with JournalStorageManager.stop in replication, JDBC and shared storage
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/63e6cd98
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/63e6cd98
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/63e6cd98
Branch: refs/heads/master
Commit: 63e6cd98f856ba8900782b7488c3ce4cf9e48257
Parents: f1dfc72
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 17 16:45:07 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 20 21:25:57 2018 -0400
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQExceptionType.java | 6 +
.../api/core/ActiveMQShutdownException.java | 31 ++++
.../core/client/impl/ClientSessionImpl.java | 4 +-
.../jdbc/store/journal/JDBCJournalImpl.java | 12 +-
.../artemis/core/journal/impl/JournalImpl.java | 41 +++--
.../journal/AbstractJournalStorageManager.java | 16 +-
.../impl/journal/JournalStorageManager.java | 63 +++++---
.../core/replication/ReplicationEndpoint.java | 14 ++
.../core/replication/ReplicationManager.java | 9 +-
.../failover/NettyReplicationStopTest.java | 150 +++++++++++++++++++
.../tests/integration/xa/BasicXaTest.java | 100 +++++++++++++
11 files changed, 407 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 9120d79..7cec2e4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -249,6 +249,12 @@ public enum ActiveMQExceptionType {
public ActiveMQException createException(String msg) {
return new ActiveMQNullRefException(msg);
}
+ },
+ SHUTDOWN_ERROR(219) {
+ @Override
+ public ActiveMQException createException(String msg) {
+ return new ActiveMQShutdownException(msg);
+ }
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java
new file mode 100644
index 0000000..03797a8
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * An operation failed because an address exists on the server.
+ */
+public final class ActiveMQShutdownException extends ActiveMQException {
+
+ public ActiveMQShutdownException() {
+ super(ActiveMQExceptionType.SHUTDOWN_ERROR);
+ }
+
+ public ActiveMQShutdownException(String msg) {
+ super(ActiveMQExceptionType.SHUTDOWN_ERROR, msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index ab9888e..711d7ce 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1500,9 +1500,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
XAException xaException = null;
if (onePhase) {
+ logger.debug("Throwing oneFase RMFAIL on xid=" + xid, t);
//we must return XA_RMFAIL
xaException = new XAException(XAException.XAER_RMFAIL);
} else {
+ logger.debug("Throwing twoFase Retry on xid=" + xid, t);
// Any error on commit -> RETRY
// We can't rollback a Prepared TX for definition
xaException = new XAException(XAException.XA_RETRY);
@@ -1753,7 +1755,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} catch (XAException xae) {
throw xae;
} catch (ActiveMQException e) {
- if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
+ if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || e.getType() == ActiveMQExceptionType.SHUTDOWN_ERROR) {
// Unblocked on failover
throw new XAException(XAException.XA_RETRY);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index f997b3c..334bc46 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -31,7 +31,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
@@ -334,19 +336,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
- private void checkStatus() {
+ private void checkStatus() throws Exception {
checkStatus(null);
}
- private void checkStatus(IOCompletion callback) {
+ private void checkStatus(IOCompletion callback) throws Exception {
if (!started) {
if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
- throw new IllegalStateException("JDBCJournal is not loaded");
+ throw new ActiveMQShutdownException("JDBCJournal is not loaded");
}
if (failed.get()) {
if (callback != null) callback.onError(-1, "JDBC Journal failed");
- throw new IllegalStateException("JDBCJournal Failed");
+ throw new ActiveMQException("JDBCJournal Failed");
}
}
@@ -388,7 +390,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
if (callback != null) callback.waitCompletion();
}
- private synchronized void addTxRecord(JDBCJournalRecord record) {
+ private synchronized void addTxRecord(JDBCJournalRecord record) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 55b92c5..30ed6e3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -823,6 +824,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
usedFile);
}
result.set(true);
+ } catch (ActiveMQShutdownException e) {
+ result.fail(e);
+ logger.error("appendPrepareRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
@@ -882,7 +886,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
result.set(true);
- } catch (Exception e) {
+ } catch (ActiveMQShutdownException e) {
+ result.fail(e);
+ logger.error("appendUpdateRecord:" + e, e);
+ } catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
logger.error("appendUpdateRecord:" + e, e);
@@ -933,7 +940,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
record.delete(usedFile);
}
result.set(true);
- } catch (Exception e) {
+ } catch (ActiveMQShutdownException e) {
+ result.fail(e);
+ logger.error("appendDeleteRecord:" + e, e);
+ } catch (Throwable e) {
result.fail(e);
logger.error("appendDeleteRecord:" + e, e);
} finally {
@@ -993,7 +1003,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(null, tx, e);
} finally {
@@ -1031,9 +1041,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- private void checkJournalIsLoaded() {
+ private void checkJournalIsLoaded() throws Exception {
if (state != JournalState.LOADED && state != JournalState.SYNCING) {
- throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
+ throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
}
}
@@ -1085,7 +1095,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
- } catch ( Exception e ) {
+ } catch (Throwable e ) {
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition(null, tx, e );
} finally {
@@ -1132,7 +1142,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.addNegative(usedFile, id);
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("appendDeleteRecordTransactional:" + e, e);
setErrorCondition(null, tx, e);
} finally {
@@ -1185,7 +1195,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.prepare(usedFile);
- } catch (Exception e) {
+ } catch (ActiveMQShutdownException e) {
+ result.fail(e);
+ logger.error("appendPrepareRecord:" + e, e);
+ } catch (Throwable e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
setErrorCondition(callback, tx, e);
@@ -1267,6 +1280,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.commit(usedFile);
+ } catch (ActiveMQShutdownException e) {
+ result.fail(e);
+ logger.error("appendCommitRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendCommitRecord:" + e, e);
@@ -1317,6 +1333,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
+ } catch (ActiveMQShutdownException e) {
+ result.fail(e);
+ logger.error("appendRollbackRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
logger.error("appendRollbackRecord:" + e, e);
@@ -2360,10 +2379,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return;
}
- setJournalState(JournalState.STOPPED);
-
flush();
+ setJournalState(JournalState.STOPPED);
+
if (providedIOThreadPool == null) {
threadPool.shutdown();
@@ -2681,6 +2700,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final JournalTransaction tx,
final IOCallback parameterCallback) throws Exception {
+ checkJournalIsLoaded();
+
final IOCallback callback;
final int size = encoder.getEncodeSize();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 7c821a9..d28eec8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -126,8 +126,10 @@ import org.jboss.logging.Logger;
*/
public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {
- private static final int CRITICAL_PATHS = 1;
- private static final int CRITICAL_STORE = 0;
+ protected static final int CRITICAL_PATHS = 3;
+ protected static final int CRITICAL_STORE = 0;
+ protected static final int CRITICAL_STOP = 1;
+ protected static final int CRITICAL_STOP_2 = 2;
private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
@@ -405,6 +407,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
leaveCritical(CRITICAL_STORE);
}
+ /** for internal use and testsuite, don't use it outside of tests */
+ public void writeLock() {
+ storageManagerLock.writeLock().lock();
+ }
+
+ /** for internal use and testsuite, don't use it outside of tests */
+ public void writeUnlock() {
+ storageManagerLock.writeLock().unlock();
+ }
+
@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
readLock();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index dd8bb22..867f2d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -229,9 +229,21 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
@Override
- public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
+ public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
+ try {
+ enterCritical(CRITICAL_STOP);
+ synchronized (this) {
+ if (internalStop(ioCriticalError, sendFailover))
+ return;
+ }
+ } finally {
+ leaveCritical(CRITICAL_STOP);
+ }
+ }
+
+ private boolean internalStop(boolean ioCriticalError, boolean sendFailover) throws Exception {
if (!started) {
- return;
+ return true;
}
if (!ioCriticalError) {
@@ -255,30 +267,41 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// that's ok
}
- // We cache the variable as the replicator could be changed between here and the time we call stop
- // since sendLiveIsStopping may issue a close back from the channel
- // and we want to ensure a stop here just in case
- ReplicationManager replicatorInUse = replicator;
- if (replicatorInUse != null) {
- if (sendFailover) {
- final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
- if (token != null) {
- try {
- token.waitCompletion(5000);
- } catch (Exception e) {
- // ignore it
+ enterCritical(CRITICAL_STOP_2);
+ storageManagerLock.writeLock().lock();
+ try {
+
+ // We cache the variable as the replicator could be changed between here and the time we call stop
+ // since sendLiveIsStopping may issue a close back from the channel
+ // and we want to ensure a stop here just in case
+ ReplicationManager replicatorInUse = replicator;
+ if (replicatorInUse != null) {
+ if (sendFailover) {
+ final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
+ if (token != null) {
+ try {
+ token.waitCompletion(5000);
+ } catch (Exception e) {
+ // ignore it
+ }
}
}
+ // we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown
+ // while the backup will never receive then
+ replicatorInUse.stop(false);
}
- replicatorInUse.stop();
- }
- bindingsJournal.stop();
+ bindingsJournal.stop();
- messageJournal.stop();
+ messageJournal.stop();
- journalLoaded = false;
+ journalLoaded = false;
- started = false;
+ started = false;
+ } finally {
+ storageManagerLock.writeLock().unlock();
+ leaveCritical(CRITICAL_STOP_2);
+ }
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 15d5311..998bbcf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -150,6 +150,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
journals[id] = journal;
}
+ /**
+ * This is for tests basically, do not use it as its API is not guaranteed for future usage.
+ */
+ public void pause() {
+ started = false;
+ }
+
+ /**
+ * This is for tests basically, do not use it as its API is not guaranteed for future usage.
+ */
+ public void resume() {
+ started = true;
+ }
+
@Override
public void handlePacket(final Packet packet) {
if (logger.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index fbf7c6c..6973706 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -282,6 +282,10 @@ public final class ReplicationManager implements ActiveMQComponent {
@Override
public void stop() throws Exception {
+ stop(true);
+ }
+
+ public void stop(boolean clearTokens) throws Exception {
synchronized (this) {
if (!started) {
logger.trace("Stopping being ignored as it hasn't been started");
@@ -297,7 +301,10 @@ public final class ReplicationManager implements ActiveMQComponent {
enabled = false;
writable.set(true);
- clearReplicationTokens();
+
+ if (clearTokens) {
+ clearReplicationTokens();
+ }
RemotingConnection toStop = remotingConnection;
if (toStop != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java
new file mode 100644
index 0000000..64343e3
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
+import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
+import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NettyReplicationStopTest extends FailoverTestBase {
+
+ @Override
+ protected TestableServer createTestableServer(Configuration config) {
+ return new SameProcessActiveMQServer(createServer(true, config));
+ }
+
+ @Override
+ protected void createConfigs() throws Exception {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected NodeManager createNodeManager() throws Exception {
+ return new InVMNodeManager(false);
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+ return getNettyAcceptorTransportConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+ return getNettyConnectorTransportConfiguration(live);
+ }
+
+ @Override
+ protected final void crash(boolean waitFailure, ClientSession... sessions) throws Exception {
+ if (sessions.length > 0) {
+ for (ClientSession session : sessions) {
+ waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
+ }
+ } else {
+ waitForRemoteBackup(null, 5, true, backupServer.getServer());
+ }
+ super.crash(waitFailure, sessions);
+ }
+
+ @Override
+ protected final void crash(ClientSession... sessions) throws Exception {
+ crash(true, sessions);
+ }
+
+ @Test
+ public void testReplicaStop() throws Exception {
+
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+ TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+ ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(15);
+
+ final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
+ ClientSession session = sf.createSession(true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 10;
+
+ ReplicationEndpoint endpoint = backupServer.getServer().getReplicationEndpoint();
+
+ endpoint.pause();
+
+ ArrayList<Thread> threads = new ArrayList<>();
+ final ArrayList<Integer> codesSent = new ArrayList<>();
+
+ CountDownLatch alignedOnSend = new CountDownLatch(10);
+
+ for (int i = 0; i < numMessages; i++) {
+ final int code = i;
+ Thread t = new Thread("WillSend " + code) {
+ @Override
+ public void run() {
+ try {
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage message = session.createMessage(true).putIntProperty("i", code);
+ alignedOnSend.countDown();
+ System.out.println("blocking!!");
+ producer.send(message);
+ codesSent.add(code);
+
+ System.out.println("Sent!");
+
+ } catch (Exception e) {
+ // that's ok;
+ e.printStackTrace(); // logging just for debug & reference
+ }
+ }
+ };
+
+ t.start();
+
+ threads.add(t);
+ }
+
+ Assert.assertTrue(alignedOnSend.await(10, TimeUnit.SECONDS));
+ liveServer.stop();
+
+ Assert.assertEquals(0, codesSent.size());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/63e6cd98/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
index 04fd1a9..d0d8ce3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -365,6 +366,105 @@ public class BasicXaTest extends ActiveMQTestBase {
}
@Test
+ public void testPrepareError() throws Exception {
+ Xid xid = newXID();
+
+ ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
+ ClientProducer clientProducer = clientSession2.createProducer(atestq);
+ ClientMessage m1 = createTextMessage(clientSession2, "m1");
+ ClientMessage m2 = createTextMessage(clientSession2, "m2");
+ ClientMessage m3 = createTextMessage(clientSession2, "m3");
+ ClientMessage m4 = createTextMessage(clientSession2, "m4");
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.start();
+ ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
+ ClientMessage m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+ clientSession.end(xid, XAResource.TMSUCCESS);
+
+ StorageManager journalStorageManager = messagingService.getStorageManager();
+
+ clientSession.prepare(xid);
+
+ journalStorageManager.getMessageJournal().stop();
+ try {
+ clientSession.commit(xid, false);
+ Assert.fail("Exception exptected");
+ } catch (XAException e) {
+ Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
+ }
+ }
+
+
+ @Test
+ public void testRollbackError() throws Exception {
+ Xid xid = newXID();
+
+ ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
+ ClientProducer clientProducer = clientSession2.createProducer(atestq);
+ ClientMessage m1 = createTextMessage(clientSession2, "m1");
+ ClientMessage m2 = createTextMessage(clientSession2, "m2");
+ ClientMessage m3 = createTextMessage(clientSession2, "m3");
+ ClientMessage m4 = createTextMessage(clientSession2, "m4");
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.start();
+ ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
+ ClientMessage m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+ m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+ clientSession.end(xid, XAResource.TMSUCCESS);
+
+ StorageManager journalStorageManager = messagingService.getStorageManager();
+
+ clientSession.prepare(xid);
+
+ journalStorageManager.getMessageJournal().stop();
+ try {
+ clientSession.rollback(xid);
+ Assert.fail("Exception exptected");
+ } catch (XAException e) {
+ Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
+ }
+ }
+
+ @Test
public void testReceiveRollback() throws Exception {
int numSessions = 100;
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
[2/2] activemq-artemis git commit: This closes #2255
Posted by ma...@apache.org.
This closes #2255
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/281cff3d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/281cff3d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/281cff3d
Branch: refs/heads/master
Commit: 281cff3d41b7bbeee6ac3ea97995a7c2e32c845a
Parents: f1dfc72 63e6cd9
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Aug 21 14:27:36 2018 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Aug 21 14:27:36 2018 +0100
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQExceptionType.java | 6 +
.../api/core/ActiveMQShutdownException.java | 31 ++++
.../core/client/impl/ClientSessionImpl.java | 4 +-
.../jdbc/store/journal/JDBCJournalImpl.java | 12 +-
.../artemis/core/journal/impl/JournalImpl.java | 41 +++--
.../journal/AbstractJournalStorageManager.java | 16 +-
.../impl/journal/JournalStorageManager.java | 63 +++++---
.../core/replication/ReplicationEndpoint.java | 14 ++
.../core/replication/ReplicationManager.java | 9 +-
.../failover/NettyReplicationStopTest.java | 150 +++++++++++++++++++
.../tests/integration/xa/BasicXaTest.java | 100 +++++++++++++
11 files changed, 407 insertions(+), 39 deletions(-)
----------------------------------------------------------------------