You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/09 16:18:26 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest

Repository: activemq-artemis
Updated Branches:
  refs/heads/master d03c4c8cc -> d94efe044


ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8e8a6f0f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8e8a6f0f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8e8a6f0f

Branch: refs/heads/master
Commit: 8e8a6f0faf121a03b07b1baa23f5d91ec1cecdc3
Parents: d03c4c8
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Nov 7 17:44:57 2017 +0100
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Wed Nov 8 14:52:57 2017 +0100

----------------------------------------------------------------------
 .../cluster/failover/FailoverTest.java          | 25 ++++--
 .../cluster/failover/FailoverTestBase.java      | 19 ++++-
 .../cluster/failover/NettyFailoverTest.java     | 85 ++++++++++++++++++++
 3 files changed, 120 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e8a6f0f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index c5954cf..dbeeec3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -111,7 +111,9 @@ public class FailoverTest extends FailoverTestBase {
    public void testTimeoutOnFailover() throws Exception {
       locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
 
-      ((InVMNodeManager) nodeManager).failoverPause = 500;
+      if (nodeManager instanceof InVMNodeManager) {
+         ((InVMNodeManager) nodeManager).failoverPause = 500L;
+      }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
 
@@ -176,7 +178,9 @@ public class FailoverTest extends FailoverTestBase {
    public void testTimeoutOnFailoverConsume() throws Exception {
       locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100).setAckBatchSize(0);
 
-      ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      if (nodeManager instanceof InVMNodeManager) {
+         ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
 
@@ -237,7 +241,9 @@ public class FailoverTest extends FailoverTestBase {
    public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
       locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
 
-      ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      if (nodeManager instanceof InVMNodeManager) {
+         ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
 
@@ -330,7 +336,9 @@ public class FailoverTest extends FailoverTestBase {
    public void testTimeoutOnFailoverTransactionCommit() throws Exception {
       locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
 
-      ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      if (nodeManager instanceof InVMNodeManager) {
+         ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
 
@@ -397,12 +405,13 @@ public class FailoverTest extends FailoverTestBase {
    public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
       locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
 
-      ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+      if (nodeManager instanceof InVMNodeManager) {
+         ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+      }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
       final ClientSession session = createSession(sf1, false, false, false);
 
-
       session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
 
       final CountDownLatch connectionFailed = new CountDownLatch(1);
@@ -474,7 +483,9 @@ public class FailoverTest extends FailoverTestBase {
    public void testTimeoutOnFailoverTransactionRollback() throws Exception {
       locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
 
-      ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      if (nodeManager instanceof InVMNodeManager) {
+         ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+      }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e8a6f0f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
index 2a42938..df20551 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
@@ -153,8 +153,15 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
       }
    }
 
+   /**
+    * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}.
+    */
+   protected NodeManager createNodeManager() {
+      return new InVMNodeManager(false);
+   }
+
    protected void createConfigs() throws Exception {
-      nodeManager = new InVMNodeManager(false);
+      nodeManager = createNodeManager();
       TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
       TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
 
@@ -167,6 +174,14 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
       liveServer = createTestableServer(liveConfig);
    }
 
+   /**
+    * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createReplicatedConfigs()}.
+    */
+   protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
+      return new InVMNodeManager(true, backupConfig.getJournalLocation());
+   }
+
+
    protected void createReplicatedConfigs() throws Exception {
       final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
       final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
@@ -180,7 +195,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
       backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
 
       setupHAPolicyConfiguration();
-      nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation());
+      nodeManager = createReplicatedBackupNodeManager(backupConfig);
 
       backupServer = createTestableServer(backupConfig);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e8a6f0f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
index bd26a46..c0d501a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.failover;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -26,12 +33,45 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
+import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
+import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.hamcrest.core.Is;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class NettyFailoverTest extends FailoverTest {
 
+   private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
+   private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
+   private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
+   private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
+   private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
+   private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+
+   public enum NodeManagerType {
+      InVM, Jdbc
+   }
+
+   @Parameterized.Parameters(name = "{0} Node Manager")
+   public static Iterable<? extends Object> nodeManagerTypes() {
+      return Arrays.asList(new Object[][]{{NodeManagerType.Jdbc}, {NodeManagerType.InVM}});
+   }
+
+   @Parameterized.Parameter
+   public NodeManagerType nodeManagerType;
+
    @Override
    protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
       return getNettyAcceptorTransportConfiguration(live);
@@ -42,6 +82,51 @@ public class NettyFailoverTest extends FailoverTest {
       return getNettyConnectorTransportConfiguration(live);
    }
 
+   private ScheduledExecutorService scheduledExecutorService;
+   private ExecutorService executor;
+
+   @Override
+   protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
+      Assume.assumeThat("Replicated backup is supported only by " + NodeManagerType.InVM + " Node Manager", nodeManagerType, Is.is(NodeManagerType.InVM));
+      return super.createReplicatedBackupNodeManager(backupConfig);
+   }
+
+   @Override
+   protected NodeManager createNodeManager() {
+
+      switch (nodeManagerType) {
+
+         case InVM:
+            return new InVMNodeManager(false);
+         case Jdbc:
+            //It can uses an in memory JavaDB: the failover tests are in process
+            final ThreadFactory daemonThreadFactory = t -> {
+               final Thread th = new Thread(t);
+               th.setDaemon(true);
+               return th;
+            };
+            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
+            executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
+            final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+            return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> {
+               code.printStackTrace();
+               Assert.fail(message);
+            });
+         default:
+            throw new AssertionError("enum type not supported!");
+      }
+   }
+
+   @After
+   public void shutDownExecutors() {
+      if (scheduledExecutorService != null) {
+         executor.shutdown();
+         scheduledExecutorService.shutdown();
+         this.executor = null;
+         this.scheduledExecutorService = null;
+      }
+   }
+
    @Test(timeout = 120000)
    public void testFailoverWithHostAlias() throws Exception {
       Map<String, Object> params = new HashMap<>();


[2/2] activemq-artemis git commit: This closes #1651

Posted by cl...@apache.org.
This closes #1651


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d94efe04
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d94efe04
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d94efe04

Branch: refs/heads/master
Commit: d94efe0442d64b60b1c17456c88c41c1e61481af
Parents: d03c4c8 8e8a6f0
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Nov 9 11:16:00 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 9 11:16:00 2017 -0500

----------------------------------------------------------------------
 .../cluster/failover/FailoverTest.java          | 25 ++++--
 .../cluster/failover/FailoverTestBase.java      | 19 ++++-
 .../cluster/failover/NettyFailoverTest.java     | 85 ++++++++++++++++++++
 3 files changed, 120 insertions(+), 9 deletions(-)
----------------------------------------------------------------------