You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/28 15:54:50 UTC

[12/16] activemq-artemis git commit: ARTEMIS-1640 JDBC NodeManager tests have to be customizable to run on different DBMS

ARTEMIS-1640 JDBC NodeManager tests have to be customizable to run on different DBMS

(cherry picked from commit b1422fc3d621699aa6acdf1822f0df05e5260495)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/03ff0661
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/03ff0661
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/03ff0661

Branch: refs/heads/1.x
Commit: 03ff0661b82b32def5a4dfe003b65a58667a01fc
Parents: 2a0b903
Author: Francesco Nigro <ni...@gmail.com>
Authored: Fri Jan 26 13:24:52 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 28 11:54:15 2018 -0400

----------------------------------------------------------------------
 .../core/server/impl/jdbc/JdbcNodeManager.java  | 60 ++++++++++++++++---
 .../server/impl/jdbc/JdbcLeaseLockTest.java     | 38 ++++++++----
 .../artemis/tests/util/ActiveMQTestBase.java    | 21 ++++++-
 .../cluster/failover/FailoverTestBase.java      |  4 +-
 .../cluster/failover/NettyFailoverTest.java     | 61 ++++++++++++--------
 5 files changed, 135 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03ff0661/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
index b2d9d3f..2360df6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -44,9 +44,12 @@ public final class JdbcNodeManager extends NodeManager {
    private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
    private static final long MAX_PAUSE_MILLIS = 2000L;
 
-   private final SharedStateManager sharedStateManager;
-   private final ScheduledLeaseLock scheduledLiveLock;
-   private final ScheduledLeaseLock scheduledBackupLock;
+   private final Supplier<? extends SharedStateManager> sharedStateManagerFactory;
+   private final Supplier<? extends ScheduledLeaseLock> scheduledLiveLockFactory;
+   private final Supplier<? extends ScheduledLeaseLock> scheduledBackupLockFactory;
+   private SharedStateManager sharedStateManager;
+   private ScheduledLeaseLock scheduledLiveLock;
+   private ScheduledLeaseLock scheduledBackupLock;
    private final long lockRenewPeriodMillis;
    private final long lockAcquisitionTimeoutMillis;
    private volatile boolean interrupted = false;
@@ -82,7 +85,14 @@ public final class JdbcNodeManager extends NodeManager {
                                           ScheduledExecutorService scheduledExecutorService,
                                           ExecutorFactory executorFactory,
                                           IOCriticalErrorListener ioCriticalErrorListener) {
-      return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+      return new JdbcNodeManager(
+         () -> JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider),
+         false,
+         lockRenewPeriodMillis,
+         lockAcquisitionTimeoutMillis,
+         scheduledExecutorService,
+         executorFactory,
+         ioCriticalErrorListener);
    }
 
    public static JdbcNodeManager usingConnectionUrl(String brokerId,
@@ -95,10 +105,17 @@ public final class JdbcNodeManager extends NodeManager {
                                                     ScheduledExecutorService scheduledExecutorService,
                                                     ExecutorFactory executorFactory,
                                                     IOCriticalErrorListener ioCriticalErrorListener) {
-      return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+      return new JdbcNodeManager(
+         () -> JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider),
+         false,
+         lockRenewPeriodMillis,
+         lockAcquisitionTimeoutMillis,
+         scheduledExecutorService,
+         executorFactory,
+         ioCriticalErrorListener);
    }
 
-   private JdbcNodeManager(final SharedStateManager sharedStateManager,
+   private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
                            boolean replicatedBackup,
                            long lockRenewPeriodMillis,
                            long lockAcquisitionTimeoutMillis,
@@ -109,10 +126,26 @@ public final class JdbcNodeManager extends NodeManager {
       this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
       this.lockRenewPeriodMillis = lockRenewPeriodMillis;
       this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
-      this.sharedStateManager = sharedStateManager;
-      this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
-      this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory != null ? executorFactory.getExecutor() : null, "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
+      this.sharedStateManagerFactory = sharedStateManagerFactory;
+      this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of(
+         scheduledExecutorService,
+         executorFactory != null ? executorFactory.getExecutor() : null,
+         "live",
+         this.sharedStateManager.liveLock(),
+         lockRenewPeriodMillis,
+         ioCriticalErrorListener);
+      this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of(
+         scheduledExecutorService,
+         executorFactory != null ?
+            executorFactory.getExecutor() : null,
+         "backup",
+         this.sharedStateManager.backupLock(),
+         lockRenewPeriodMillis,
+         ioCriticalErrorListener);
       this.ioCriticalErrorListener = ioCriticalErrorListener;
+      this.sharedStateManager = null;
+      this.scheduledLiveLock = null;
+      this.scheduledBackupLock = null;
    }
 
    @Override
@@ -122,13 +155,19 @@ public final class JdbcNodeManager extends NodeManager {
             if (isStarted()) {
                return;
             }
+            this.sharedStateManager = sharedStateManagerFactory.get();
             if (!replicatedBackup) {
                final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
                setUUID(nodeId);
             }
+            this.scheduledLiveLock = scheduledLiveLockFactory.get();
+            this.scheduledBackupLock = scheduledBackupLockFactory.get();
             super.start();
          }
       } catch (IllegalStateException e) {
+         this.sharedStateManager = null;
+         this.scheduledLiveLock = null;
+         this.scheduledBackupLock = null;
          if (this.ioCriticalErrorListener != null) {
             this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
          }
@@ -145,6 +184,9 @@ public final class JdbcNodeManager extends NodeManager {
          } finally {
             super.stop();
             this.sharedStateManager.close();
+            this.sharedStateManager = null;
+            this.scheduledLiveLock = null;
+            this.scheduledBackupLock = null;
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03ff0661/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
index 201db6a..d4b63de 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
@@ -25,31 +25,34 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
+public class JdbcLeaseLockTest extends ActiveMQTestBase {
 
-public class JdbcLeaseLockTest {
-
-   private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
-   private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
-   private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
-   private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
    private JdbcSharedStateManager jdbcSharedStateManager;
+   private DatabaseStorageConfiguration dbConf;
+   private SQLProvider sqlProvider;
 
    private LeaseLock lock() {
-      return lock(DEFAULT_LOCK_EXPIRATION_MILLIS);
+      return lock(dbConf.getJdbcLockExpirationMillis());
    }
 
    private LeaseLock lock(long acquireMillis) {
       try {
-         return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0);
+         return JdbcSharedStateManager
+            .createLiveLock(
+               UUID.randomUUID().toString(),
+               jdbcSharedStateManager.getConnection(),
+               sqlProvider,
+               acquireMillis,
+               0);
       } catch (SQLException e) {
          throw new IllegalStateException(e);
       }
@@ -57,7 +60,18 @@ public class JdbcLeaseLockTest {
 
    @Before
    public void createLockTable() {
-      jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER);
+      dbConf = createDefaultDatabaseStorageConfiguration();
+      sqlProvider = JDBCUtils.getSQLProvider(
+         dbConf.getJdbcDriverClassName(),
+         dbConf.getNodeManagerStoreTableName(),
+         SQLProvider.DatabaseStoreType.NODE_MANAGER);
+      jdbcSharedStateManager = JdbcSharedStateManager
+         .usingConnectionUrl(
+            UUID.randomUUID().toString(),
+            dbConf.getJdbcLockExpirationMillis(),
+            dbConf.getJdbcConnectionUrl(),
+            dbConf.getJdbcDriverClassName(),
+            sqlProvider);
    }
 
    @After

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03ff0661/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index b4ea62f..b1fdd25 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -62,6 +62,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -457,6 +458,10 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected void setDBStoreType(Configuration configuration) {
+      configuration.setStoreConfiguration(createDefaultDatabaseStorageConfiguration());
+   }
+
+   protected DatabaseStorageConfiguration createDefaultDatabaseStorageConfiguration() {
       DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
       dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
       dbStorageConfiguration.setBindingsTableName("BINDINGS");
@@ -465,8 +470,22 @@ public abstract class ActiveMQTestBase extends Assert {
       dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
       dbStorageConfiguration.setJMSBindingsTableName("JMS_BINDINGS");
       dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
+      dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
+      dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
+      dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
+      return dbStorageConfiguration;
+   }
+
+   protected long getJdbcLockAcquisitionTimeoutMillis() {
+      return Long.getLong("jdbc.lock.acquisition", ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis());
+   }
+
+   protected long getJdbcLockExpirationMillis() {
+      return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis());
+   }
 
-      configuration.setStoreConfiguration(dbStorageConfiguration);
+   protected long getJdbcLockRenewPeriodMillis() {
+      return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
    }
 
    public void destroyTables(List<String> tableNames) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03ff0661/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
index ec97663..3c7c8fc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
@@ -112,7 +112,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
       liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer");
    }
 
-   protected TestableServer createTestableServer(Configuration config) {
+   protected TestableServer createTestableServer(Configuration config) throws Exception {
       boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
       return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
    }
@@ -156,7 +156,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
    /**
     * Override this if is needed a different implementation of {@link NodeManager} to be used into {@link #createConfigs()}.
     */
-   protected NodeManager createNodeManager() {
+   protected NodeManager createNodeManager() throws Exception {
       return new InVMNodeManager(false);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/03ff0661/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
index d6d9c1b..3eecb6f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
@@ -16,16 +16,16 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.failover;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -34,13 +34,17 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
 import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
-import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
-import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
+import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.hamcrest.core.Is;
 import org.junit.After;
@@ -50,18 +54,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
-
 @RunWith(Parameterized.class)
 public class NettyFailoverTest extends FailoverTest {
 
-   private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
-   private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
-   private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
-   private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
-   private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
-   private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
-
    public enum NodeManagerType {
       InVM, Jdbc
    }
@@ -84,8 +79,8 @@ public class NettyFailoverTest extends FailoverTest {
       return getNettyConnectorTransportConfiguration(live);
    }
 
-   private ScheduledExecutorService scheduledExecutorService;
-   private ExecutorService executor;
+   private List<ScheduledExecutorService> scheduledExecutorServices = new ArrayList<>();
+   private List<ExecutorService> executors = new ArrayList<>();
 
    @Override
    protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
@@ -94,23 +89,25 @@ public class NettyFailoverTest extends FailoverTest {
    }
 
    @Override
-   protected NodeManager createNodeManager() {
+   protected NodeManager createNodeManager() throws Exception {
 
       switch (nodeManagerType) {
 
          case InVM:
             return new InVMNodeManager(false);
          case Jdbc:
-            //It can uses an in memory JavaDB: the failover tests are in process
             final ThreadFactory daemonThreadFactory = t -> {
                final Thread th = new Thread(t);
                th.setDaemon(true);
                return th;
             };
-            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
-            executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
+            final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
+            scheduledExecutorServices.add(scheduledExecutorService);
+            final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
+            executors.add(executor);
+            final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
             final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
-            return JdbcNodeManager.usingConnectionUrl(UUID.randomUUID().toString(), JDBC_LOCK_EXPIRATION_MILLIS, JDBC_LOCK_RENEW_PERIOD_MILLIS, JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER, scheduledExecutorService, executorFactory, (code, message, file) -> {
+            return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
                code.printStackTrace();
                Assert.fail(message);
             });
@@ -119,13 +116,27 @@ public class NettyFailoverTest extends FailoverTest {
       }
    }
 
+
+   @Override
+   protected TestableServer createTestableServer(Configuration config) throws Exception {
+      final boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
+      NodeManager nodeManager = this.nodeManager;
+      //create a separate NodeManager for the backup
+      if (isBackup && nodeManagerType == NodeManagerType.Jdbc) {
+         nodeManager = createNodeManager();
+      }
+      return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
+   }
+
+
    @After
    public void shutDownExecutors() {
-      if (scheduledExecutorService != null) {
-         executor.shutdown();
-         scheduledExecutorService.shutdown();
-         this.executor = null;
-         this.scheduledExecutorService = null;
+      if (!scheduledExecutorServices.isEmpty()) {
+         ThreadLeakCheckRule.addKownThread("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser");
+         executors.forEach(ExecutorService::shutdown);
+         scheduledExecutorServices.forEach(ExecutorService::shutdown);
+         executors.clear();
+         scheduledExecutorServices.clear();
       }
    }