You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/28 15:54:49 UTC
[11/16] activemq-artemis git commit: ARTEMIS-1509 Add support for
JdbcNodeManager into the NettyFailoverTest
ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest
(cherry picked from commit 8e8a6f0faf121a03b07b1baa23f5d91ec1cecdc3)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a0f3da5d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a0f3da5d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a0f3da5d
Branch: refs/heads/1.x
Commit: a0f3da5d7208918fb2f45352d6f21996ac2ae935
Parents: 590fbcf
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Nov 7 17:44:57 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 28 11:54:15 2018 -0400
----------------------------------------------------------------------
.../cluster/failover/FailoverTest.java | 24 ++++--
.../cluster/failover/FailoverTestBase.java | 19 ++++-
.../cluster/failover/NettyFailoverTest.java | 87 +++++++++++++++++++-
3 files changed, 121 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0f3da5d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index 72b5ac4..6a592c2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -110,7 +110,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailover() throws Exception {
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
- ((InVMNodeManager) nodeManager).failoverPause = 500;
+ if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 500L;
+ }
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -175,7 +177,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverConsume() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100).setAckBatchSize(0);
- ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ }
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -236,7 +240,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
- ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ }
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -329,7 +335,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
- ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ }
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -396,7 +404,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
- ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+ if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+ }
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
final ClientSession session = createSession(sf1, false, false, false);
@@ -473,7 +483,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionRollback() throws Exception {
locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
- ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+ }
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0f3da5d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
index b08ceb1..ec97663 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
@@ -153,8 +153,15 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
}
}
+ /**
+ * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}.
+ */
+ protected NodeManager createNodeManager() {
+ return new InVMNodeManager(false);
+ }
+
protected void createConfigs() throws Exception {
- nodeManager = new InVMNodeManager(false);
+ nodeManager = createNodeManager();
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
@@ -167,6 +174,14 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
liveServer = createTestableServer(liveConfig);
}
+ /**
+ * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createReplicatedConfigs()}.
+ */
+ protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
+ return new InVMNodeManager(true, backupConfig.getJournalLocation());
+ }
+
+
protected void createReplicatedConfigs() throws Exception {
final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
@@ -180,7 +195,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
setupHAPolicyConfiguration();
- nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation());
+ nodeManager = createReplicatedBackupNodeManager(backupConfig);
backupServer = createTestableServer(backupConfig);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0f3da5d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
index 0c8d116..d5b7292 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
@@ -16,9 +16,16 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -26,12 +33,45 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
+import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
+import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.hamcrest.core.Is;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class NettyFailoverTest extends FailoverTest {
+ private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
+ private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
+ private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
+ private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
+ private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
+ private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+
+ public enum NodeManagerType {
+ InVM, Jdbc
+ }
+
+ @Parameterized.Parameters(name = "{0} Node Manager")
+ public static Iterable<? extends Object> nodeManagerTypes() {
+ return Arrays.asList(new Object[][]{{NodeManagerType.Jdbc}, {NodeManagerType.InVM}});
+ }
+
+ @Parameterized.Parameter
+ public NodeManagerType nodeManagerType;
+
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfiguration(live);
@@ -42,7 +82,52 @@ public class NettyFailoverTest extends FailoverTest {
return getNettyConnectorTransportConfiguration(live);
}
- @Test
+ private ScheduledExecutorService scheduledExecutorService;
+ private ExecutorService executor;
+
+ @Override
+ protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
+ Assume.assumeThat("Replicated backup is supported only by " + NodeManagerType.InVM + " Node Manager", nodeManagerType, Is.is(NodeManagerType.InVM));
+ return super.createReplicatedBackupNodeManager(backupConfig);
+ }
+
+ @Override
+ protected NodeManager createNodeManager() {
+
+ switch (nodeManagerType) {
+
+ case InVM:
+ return new InVMNodeManager(false);
+ case Jdbc:
+ //It can uses an in memory JavaDB: the failover tests are in process
+ final ThreadFactory daemonThreadFactory = t -> {
+ final Thread th = new Thread(t);
+ th.setDaemon(true);
+ return th;
+ };
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
+ executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
+ final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+ return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> {
+ code.printStackTrace();
+ Assert.fail(message);
+ });
+ default:
+ throw new AssertionError("enum type not supported!");
+ }
+ }
+
+ @After
+ public void shutDownExecutors() {
+ if (scheduledExecutorService != null) {
+ executor.shutdown();
+ scheduledExecutorService.shutdown();
+ this.executor = null;
+ this.scheduledExecutorService = null;
+ }
+ }
+
+ @Test(timeout = 120000)
public void testFailoverWithHostAlias() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");