You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/20 21:24:00 UTC

[activemq-artemis] branch master updated: ARTEMIS-2941 Improve JDBC HA connection resiliency

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4545749  ARTEMIS-2941 Improve JDBC HA connection resiliency
     new 9a95418  This closes #3301
4545749 is described below

commit 4545749969a15052ba52edd4700f29421a8e746d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Mon Sep 28 13:49:20 2020 +0200

    ARTEMIS-2941 Improve JDBC HA connection resiliency
---
 .../activemq/artemis/core/server/NodeManager.java  | 176 ++++-------
 .../core/server/cluster/ha/ColocatedPolicy.java    |   5 +-
 .../artemis/core/server/cluster/ha/HAPolicy.java   |   3 +-
 .../core/server/cluster/ha/LiveOnlyPolicy.java     |   3 +-
 .../core/server/cluster/ha/ReplicaPolicy.java      |   5 +-
 .../core/server/cluster/ha/ReplicatedPolicy.java   |   3 +-
 .../server/cluster/ha/SharedStoreMasterPolicy.java |   5 +-
 .../server/cluster/ha/SharedStoreSlavePolicy.java  |   5 +-
 .../core/server/impl/ActiveMQServerImpl.java       |  57 ++--
 .../FileBasedNodeManager.java}                     | 129 ++------
 .../core/server/impl/FileLockNodeManager.java      | 351 +++++++++++----------
 .../artemis/core/server/impl/InVMNodeManager.java  |  30 +-
 .../server/impl/SharedNothingBackupActivation.java |   9 +-
 .../server/impl/SharedNothingLiveActivation.java   |   2 +-
 .../server/impl/SharedStoreBackupActivation.java   | 136 ++++++--
 .../server/impl/SharedStoreLiveActivation.java     | 110 ++++---
 .../impl/jdbc/ActiveMQScheduledLeaseLock.java      |  55 +++-
 .../core/server/impl/jdbc/JdbcLeaseLock.java       |  54 +++-
 .../core/server/impl/jdbc/JdbcNodeManager.java     | 328 ++++++++++++-------
 .../server/impl/jdbc/JdbcSharedStateManager.java   |  46 ++-
 .../artemis/core/server/impl/jdbc/LeaseLock.java   |   2 +
 .../core/server/impl/jdbc/ScheduledLeaseLock.java  |  14 +-
 .../core/server/impl/jdbc/JdbcLeaseLockTest.java   | 113 ++++++-
 .../core/server/impl/jdbc/JdbcNodeManagerTest.java |  15 +-
 .../artemis/tests/util/ActiveMQTestBase.java       |   5 +-
 .../tests/extras/byteman/FileLockMonitorTest.java  | 244 +++++++-------
 .../extras/byteman/FileLockNodeManagerTest.java    |   2 +-
 .../byteman/SharedStoreBackupActivationTest.java   | 151 ---------
 ...nagerTest.java => FileLockNodeManagerTest.java} |  27 +-
 .../integration/cluster/JdbcNodeManagerTest.java   | 104 ++++++
 .../integration/cluster/NodeManagerAction.java     |  35 +-
 .../tests/integration/cluster/NodeManagerTest.java |  12 +
 .../cluster/failover/FileLockNodeManagerTest.java  |   5 +-
 .../cluster/failover/NettyFailoverTest.java        |   5 +-
 .../integration/critical/CriticalCrashTest.java    |   2 +-
 .../ShutdownOnCriticalIOErrorMoveNextTest.java     |   2 +-
 .../integration/discovery/DiscoveryBaseTest.java   |  20 +-
 37 files changed, 1249 insertions(+), 1021 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
index 4b5c3cb..a6b05e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
@@ -16,52 +16,50 @@
  */
 package org.apache.activemq.artemis.core.server;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
 import org.apache.activemq.artemis.utils.UUID;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
 
 public abstract class NodeManager implements ActiveMQComponent {
 
-   protected static final byte FIRST_TIME_START = '0';
-   public static final String SERVER_LOCK_NAME = "server.lock";
-   private static final String ACCESS_MODE = "rw";
+   @FunctionalInterface
+   public interface LockListener {
 
+      void lostLock();
+   }
+
+   private static final Logger LOGGER = Logger.getLogger(NodeManager.class);
    protected final boolean replicatedBackup;
-   private final File directory;
-   private final Object nodeIDGuard = new Object();
+   protected final Object nodeIDGuard = new Object();
    private SimpleString nodeID;
    private UUID uuid;
    private boolean isStarted = false;
+   private final Set<FileLockNodeManager.LockListener> lockListeners;
 
-   protected FileChannel channel;
-
-   public NodeManager(final boolean replicatedBackup, final File directory) {
-      this.directory = directory;
+   public NodeManager(final boolean replicatedBackup) {
       this.replicatedBackup = replicatedBackup;
+      this.lockListeners = new HashSet<>();
    }
 
    // --------------------------------------------------------------------
 
-   public abstract void awaitLiveNode() throws Exception;
+   public abstract void awaitLiveNode() throws NodeManagerException, InterruptedException;
 
-   public abstract void awaitLiveStatus() throws Exception;
+   public abstract void awaitLiveStatus() throws NodeManagerException, InterruptedException;
 
-   public abstract void startBackup() throws Exception;
+   public abstract void startBackup() throws NodeManagerException, InterruptedException;
 
-   public abstract ActivateCallback startLiveNode() throws Exception;
+   public abstract ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException;
 
-   public abstract void pauseLiveServer() throws Exception;
+   public abstract void pauseLiveServer() throws NodeManagerException;
 
-   public abstract void crashLiveServer() throws Exception;
+   public abstract void crashLiveServer() throws NodeManagerException;
 
-   public abstract void releaseBackup() throws Exception;
+   public abstract void releaseBackup() throws NodeManagerException;
 
    // --------------------------------------------------------------------
 
@@ -81,7 +79,7 @@ public abstract class NodeManager implements ActiveMQComponent {
       }
    }
 
-   public abstract SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException;
+   public abstract SimpleString readNodeId() throws NodeManagerException;
 
    public UUID getUUID() {
       synchronized (nodeIDGuard) {
@@ -113,119 +111,63 @@ public abstract class NodeManager implements ActiveMQComponent {
       }
    }
 
-   public abstract boolean isAwaitingFailback() throws Exception;
+   public abstract boolean isAwaitingFailback() throws NodeManagerException;
 
-   public abstract boolean isBackupLive() throws Exception;
+   public abstract boolean isBackupLive() throws NodeManagerException;
 
    public abstract void interrupt();
 
    @Override
    public synchronized void stop() throws Exception {
-      FileChannel channelCopy = channel;
-      if (channelCopy != null)
-         channelCopy.close();
+      // force any running threads on node manager to stop
       isStarted = false;
+      lockListeners.clear();
    }
 
-   public void stopBackup() throws Exception {
-      if (replicatedBackup && getNodeId() != null) {
-         setUpServerLockFile();
-      }
+   public void stopBackup() throws NodeManagerException {
       releaseBackup();
    }
 
-   /**
-    * Ensures existence of persistent information about the server's nodeID.
-    * <p>
-    * Roughly the different use cases are:
-    * <ol>
-    * <li>old live server restarts: a server.lock file already exists and contains a nodeID.
-    * <li>new live server starting for the first time: no file exists, and we just *create* a new
-    * UUID to use as nodeID
-    * <li>replicated backup received its nodeID from its live: no file exists, we need to persist
-    * the *current* nodeID
-    * </ol>
-    */
-   protected synchronized void setUpServerLockFile() throws IOException {
-      File serverLockFile = newFile(SERVER_LOCK_NAME);
-
-      boolean fileCreated = false;
-
-      int count = 0;
-      while (!serverLockFile.exists()) {
-         try {
-            fileCreated = serverLockFile.createNewFile();
-         } catch (RuntimeException e) {
-            ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
-            throw e;
-         } catch (IOException e) {
-            /*
-            * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
-            * */
-            if (count < 5) {
-               try {
-                  Thread.sleep(100);
-               } catch (InterruptedException e1) {
-               }
-               count++;
-               continue;
-            }
-            ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
-            throw e;
-         }
+   protected synchronized void checkStarted() {
+      if (!isStarted) {
+         throw new IllegalStateException("the node manager is supposed to be started");
       }
+   }
 
-      @SuppressWarnings("resource")
-      RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE);
-
-      channel = raFile.getChannel();
-
-      if (fileCreated) {
-         ByteBuffer id = ByteBuffer.allocateDirect(3);
-         byte[] bytes = new byte[3];
-         bytes[0] = FIRST_TIME_START;
-         bytes[1] = FIRST_TIME_START;
-         bytes[2] = FIRST_TIME_START;
-         id.put(bytes, 0, 3);
-         id.position(0);
-         channel.write(id, 0);
-         channel.force(true);
+   protected synchronized void notifyLostLock() {
+      if (!isStarted) {
+         return;
       }
+      lockListeners.forEach(lockListener -> {
+         try {
+            lockListener.lostLock();
+         } catch (Exception e) {
+            LOGGER.warn("On notify lost lock", e);
+            // Need to notify everyone so ignore any exception
+         }
+      });
+   }
 
-      createNodeId();
+   public synchronized void registerLockListener(FileLockNodeManager.LockListener lockListener) {
+      lockListeners.add(lockListener);
    }
 
-   /**
-    * @return
-    */
-   protected final File newFile(final String fileName) {
-      File file = new File(directory, fileName);
-      return file;
+   public synchronized void unregisterLockListener(FileLockNodeManager.LockListener lockListener) {
+      lockListeners.remove(lockListener);
    }
 
-   protected final synchronized void createNodeId() throws IOException {
-      synchronized (nodeIDGuard) {
-         ByteBuffer id = ByteBuffer.allocateDirect(16);
-         int read = channel.read(id, 3);
-         if (replicatedBackup) {
-            id.position(0);
-            id.put(getUUID().asBytes(), 0, 16);
-            id.position(0);
-            channel.write(id, 3);
-            channel.force(true);
-         } else if (read != 16) {
-            setUUID(UUIDGenerator.getInstance().generateUUID());
-            id.put(getUUID().asBytes(), 0, 16);
-            id.position(0);
-            channel.write(id, 3);
-            channel.force(true);
-         } else {
-            byte[] bytes = new byte[16];
-            id.position(0);
-            id.get(bytes);
-            setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
-         }
+   public static final class NodeManagerException extends RuntimeException {
+
+      public NodeManagerException(String message) {
+         super(message);
+      }
+
+      public NodeManagerException(Throwable cause) {
+         super(cause);
       }
-   }
 
+      public NodeManagerException(String message, Throwable cause) {
+         super(message, cause);
+      }
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
index b72c755..572f54c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ColocatedActivation;
 import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@@ -91,8 +92,8 @@ public class ColocatedPolicy implements HAPolicy<LiveActivation> {
    public LiveActivation createActivation(ActiveMQServerImpl server,
                                           boolean wasLive,
                                           Map<String, Object> activationParams,
-                                          ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception {
-      return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, shutdownOnCriticalIO));
+                                          IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
+      return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, ioCriticalErrorListener));
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
index bb93014..c5d62ac 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 
@@ -34,7 +35,7 @@ public interface HAPolicy<T extends Activation> {
    T createActivation(ActiveMQServerImpl server,
                       boolean wasLive,
                       Map<String, Object> activationParams,
-                      ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception;
+                      IOCriticalErrorListener shutdownOnCriticalIO) throws Exception;
 
    boolean isSharedStore();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java
index 442c22d..9aa6a90 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
 
 import java.util.Map;
 
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
@@ -37,7 +38,7 @@ public class LiveOnlyPolicy implements HAPolicy<Activation> {
    public Activation createActivation(ActiveMQServerImpl server,
                                       boolean wasLive,
                                       Map<String, Object> activationParams,
-                                      ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
+                                      IOCriticalErrorListener ioCriticalErrorListener) {
       return new LiveOnlyActivation(server, this);
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
index 36e65f0..2fbdfaa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -211,8 +212,8 @@ public class ReplicaPolicy extends BackupPolicy {
    public Activation createActivation(ActiveMQServerImpl server,
                                       boolean wasLive,
                                       Map<String, Object> activationParams,
-                                      ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception {
-      SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this, networkHealthCheck);
+                                      IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
+      SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, ioCriticalErrorListener, this, networkHealthCheck);
       backupActivation.init();
       return backupActivation;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
index 99b98fe..d4851ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@@ -231,7 +232,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
    public LiveActivation createActivation(ActiveMQServerImpl server,
                                           boolean wasLive,
                                           Map<String, Object> activationParams,
-                                          ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
+                                          IOCriticalErrorListener ioCriticalErrorListener) {
       return new SharedNothingLiveActivation(server, this);
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java
index 82bbaf7..3e3913e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.LiveActivation;
 import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation;
@@ -91,8 +92,8 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
    public LiveActivation createActivation(ActiveMQServerImpl server,
                                           boolean wasLive,
                                           Map<String, Object> activationParams,
-                                          ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
-      return new SharedStoreLiveActivation(server, this);
+                                          IOCriticalErrorListener ioCriticalErrorListener) {
+      return new SharedStoreLiveActivation(server, this, ioCriticalErrorListener);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
index a4a0ed1..4a00ccf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.SharedStoreBackupActivation;
@@ -101,8 +102,8 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
    public Activation createActivation(ActiveMQServerImpl server,
                                       boolean wasLive,
                                       Map<String, Object> activationParams,
-                                      ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
-      return new SharedStoreBackupActivation(server, this);
+                                      IOCriticalErrorListener ioCriticalErrorListener) {
+      return new SharedStoreBackupActivation(server, this, ioCriticalErrorListener);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c538569..0616362 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -329,7 +329,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private final Map<String, Object> activationParams = new HashMap<>();
 
-   protected final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
+   protected final IOCriticalErrorListener ioCriticalErrorListener = new DefaultCriticalErrorListener();
 
    private final ActiveMQServer parentServer;
 
@@ -522,7 +522,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
             }
             final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
-            manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO);
+            manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory);
          } else if (haType == null || haType == HAPolicyConfiguration.TYPE.LIVE_ONLY) {
             if (logger.isDebugEnabled()) {
                logger.debug("Detected no Shared Store HA options on JDBC store");
@@ -610,7 +610,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
          final boolean wasLive = !haPolicy.isBackup();
          if (!haPolicy.isBackup()) {
-            activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
+            activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
 
             if (afterActivationCreated != null) {
                try {
@@ -636,9 +636,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          // checking again here
          if (haPolicy.isBackup()) {
             if (haPolicy.isSharedStore()) {
-               activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
+               activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
             } else {
-               activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
+               activation = haPolicy.createActivation(this, wasLive, activationParams, ioCriticalErrorListener);
             }
 
             if (afterActivationCreated != null) {
@@ -1117,12 +1117,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       this.stop(failoverOnServerShutdown, criticalIOError, restarting, false);
    }
 
+   private void stop(boolean failoverOnServerShutdown,
+                     final boolean criticalIOError,
+                     boolean restarting,
+                     boolean isShutdown) {
+      stop(failoverOnServerShutdown, criticalIOError, isShutdown || criticalIOError, restarting, isShutdown);
+   }
+
    /**
     * Stops the server
     *
     * @param criticalIOError whether we have encountered an IO error with the journal etc
     */
-   void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) {
+   private void stop(boolean failoverOnServerShutdown,
+                     final boolean criticalIOError,
+                     boolean shutdownExternalComponents,
+                     boolean restarting,
+                     boolean isShutdown) {
 
       if (logger.isDebugEnabled()) {
          logger.debug("Stopping server " + this);
@@ -1344,7 +1355,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       connectedClientIds.clear();
 
-      stopExternalComponents(isShutdown || criticalIOError);
+      stopExternalComponents(shutdownExternalComponents);
 
       try {
          this.analyzer.clear();
@@ -2794,9 +2805,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    protected PagingStoreFactory getPagingStoreFactory() throws Exception {
       if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
          DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
-         return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
+         return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, ioCriticalErrorListener, configuration.isReadWholePage());
       }
-      return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
+      return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener, configuration.isReadWholePage());
    }
 
    /**
@@ -2805,12 +2816,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    protected StorageManager createStorageManager() {
       if (configuration.isPersistenceEnabled()) {
          if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
-            JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
+            JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, ioCriticalErrorListener);
             this.getCriticalAnalyzer().add(journal);
             return journal;
          } else {
             // Default to File Based Storage Manager, (Legacy default configuration).
-            JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);
+            JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
             this.getCriticalAnalyzer().add(journal);
             return journal;
          }
@@ -3136,7 +3147,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       if (configuration.getMaxDiskUsage() != -1) {
          try {
-            injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO));
+            injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, ioCriticalErrorListener));
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e);
          }
@@ -3929,23 +3940,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    // Inner classes
    // --------------------------------------------------------------------------------
 
-   public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener {
+   public final class DefaultCriticalErrorListener implements IOCriticalErrorListener {
 
-      boolean failedAlready = false;
+      private final AtomicBoolean failedAlready = new AtomicBoolean();
 
       @Override
       public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
-         if (!failedAlready) {
-            failedAlready = true;
-
-            if (file == null) {
-               ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
-            } else {
-               ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
-            }
+         if (!failedAlready.compareAndSet(false, true)) {
+            return;
+         }
 
-            stopTheServer(true);
+         if (file == null) {
+            ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
+         } else {
+            ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
          }
+
+         stopTheServer(true);
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
similarity index 63%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
index 4b5c3cb..cfbcb47 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.artemis.core.server;
+package org.apache.activemq.artemis.core.server.impl;
 
 import java.io.File;
 import java.io.IOException;
@@ -22,116 +22,22 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
-import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 
-public abstract class NodeManager implements ActiveMQComponent {
+public abstract class FileBasedNodeManager extends NodeManager {
 
    protected static final byte FIRST_TIME_START = '0';
    public static final String SERVER_LOCK_NAME = "server.lock";
    private static final String ACCESS_MODE = "rw";
-
-   protected final boolean replicatedBackup;
    private final File directory;
-   private final Object nodeIDGuard = new Object();
-   private SimpleString nodeID;
-   private UUID uuid;
-   private boolean isStarted = false;
-
    protected FileChannel channel;
 
-   public NodeManager(final boolean replicatedBackup, final File directory) {
+   public FileBasedNodeManager(boolean replicatedBackup, File directory) {
+      super(replicatedBackup);
       this.directory = directory;
-      this.replicatedBackup = replicatedBackup;
-   }
-
-   // --------------------------------------------------------------------
-
-   public abstract void awaitLiveNode() throws Exception;
-
-   public abstract void awaitLiveStatus() throws Exception;
-
-   public abstract void startBackup() throws Exception;
-
-   public abstract ActivateCallback startLiveNode() throws Exception;
-
-   public abstract void pauseLiveServer() throws Exception;
-
-   public abstract void crashLiveServer() throws Exception;
-
-   public abstract void releaseBackup() throws Exception;
-
-   // --------------------------------------------------------------------
-
-   @Override
-   public synchronized void start() throws Exception {
-      isStarted = true;
-   }
-
-   @Override
-   public boolean isStarted() {
-      return isStarted;
-   }
-
-   public SimpleString getNodeId() {
-      synchronized (nodeIDGuard) {
-         return nodeID;
-      }
-   }
-
-   public abstract SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException;
-
-   public UUID getUUID() {
-      synchronized (nodeIDGuard) {
-         return uuid;
-      }
-   }
-
-   /**
-    * Sets the nodeID.
-    * <p>
-    * Only used by replicating backups.
-    *
-    * @param nodeID
-    */
-   public void setNodeID(String nodeID) {
-      synchronized (nodeIDGuard) {
-         this.nodeID = new SimpleString(nodeID);
-         this.uuid = new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeID));
-      }
-   }
-
-   /**
-    * @param generateUUID
-    */
-   protected void setUUID(UUID generateUUID) {
-      synchronized (nodeIDGuard) {
-         uuid = generateUUID;
-         nodeID = new SimpleString(uuid.toString());
-      }
-   }
-
-   public abstract boolean isAwaitingFailback() throws Exception;
-
-   public abstract boolean isBackupLive() throws Exception;
-
-   public abstract void interrupt();
-
-   @Override
-   public synchronized void stop() throws Exception {
-      FileChannel channelCopy = channel;
-      if (channelCopy != null)
-         channelCopy.close();
-      isStarted = false;
-   }
-
-   public void stopBackup() throws Exception {
-      if (replicatedBackup && getNodeId() != null) {
-         setUpServerLockFile();
-      }
-      releaseBackup();
    }
 
    /**
@@ -160,8 +66,8 @@ public abstract class NodeManager implements ActiveMQComponent {
             throw e;
          } catch (IOException e) {
             /*
-            * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
-            * */
+             * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
+             * */
             if (count < 5) {
                try {
                   Thread.sleep(100);
@@ -228,4 +134,23 @@ public abstract class NodeManager implements ActiveMQComponent {
       }
    }
 
+   @Override
+   public synchronized void stop() throws Exception {
+      FileChannel channelCopy = channel;
+      if (channelCopy != null)
+         channelCopy.close();
+      super.stop();
+   }
+
+   @Override
+   public void stopBackup() throws NodeManagerException {
+      if (replicatedBackup && getNodeId() != null) {
+         try {
+            setUpServerLockFile();
+         } catch (IOException e) {
+            throw new NodeManagerException(e);
+         }
+      }
+      super.stopBackup();
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
index ddd59ff..3b3da1c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
@@ -22,23 +22,18 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActivateCallback;
 import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.utils.UUID;
 import org.jboss.logging.Logger;
 
-public class FileLockNodeManager extends NodeManager {
+public class FileLockNodeManager extends FileBasedNodeManager {
 
    private static final Logger logger = Logger.getLogger(FileLockNodeManager.class);
 
@@ -58,9 +53,9 @@ public class FileLockNodeManager extends NodeManager {
 
    private static final byte NOT_STARTED = 'N';
 
-   private static final long LOCK_ACCESS_FAILURE_WAIT_TIME = 2000;
+   private static final long LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
 
-   private static final int LOCK_MONITOR_TIMEOUT_MILLIES = 2000;
+   private static final long LOCK_MONITOR_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(2);
 
    private volatile FileLock liveLock;
 
@@ -68,28 +63,31 @@ public class FileLockNodeManager extends NodeManager {
 
    private final FileChannel[] lockChannels = new FileChannel[3];
 
-   protected long lockAcquisitionTimeout = -1;
+   private final long lockAcquisitionTimeoutNanos;
 
    protected boolean interrupted = false;
 
-   private ScheduledExecutorService scheduledPool;
+   private final ScheduledExecutorService scheduledPool;
 
    public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) {
       super(replicatedBackup, directory);
       this.scheduledPool = scheduledPool;
+      this.lockAcquisitionTimeoutNanos = -1;
    }
 
    public FileLockNodeManager(final File directory, boolean replicatedBackup) {
       super(replicatedBackup, directory);
       this.scheduledPool = null;
+      this.lockAcquisitionTimeoutNanos = -1;
    }
 
-   public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout,
-         ScheduledExecutorService scheduledPool) {
+   public FileLockNodeManager(final File directory,
+                              boolean replicatedBackup,
+                              long lockAcquisitionTimeout,
+                              ScheduledExecutorService scheduledPool) {
       super(replicatedBackup, directory);
-
       this.scheduledPool = scheduledPool;
-      this.lockAcquisitionTimeout = lockAcquisitionTimeout;
+      this.lockAcquisitionTimeoutNanos = lockAcquisitionTimeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeout);
    }
 
    @Override
@@ -141,19 +139,23 @@ public class FileLockNodeManager extends NodeManager {
    }
 
    @Override
-   public boolean isAwaitingFailback() throws Exception {
+   public boolean isAwaitingFailback() throws NodeManagerException {
       return getState() == FileLockNodeManager.FAILINGBACK;
    }
 
    @Override
-   public boolean isBackupLive() throws Exception {
-      FileLock liveAttemptLock;
-      liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS);
-      if (liveAttemptLock == null) {
-         return true;
-      } else {
-         liveAttemptLock.release();
-         return false;
+   public boolean isBackupLive() throws NodeManagerException {
+      try {
+         FileLock liveAttemptLock;
+         liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS);
+         if (liveAttemptLock == null) {
+            return true;
+         } else {
+            liveAttemptLock.release();
+            return false;
+         }
+      } catch (IOException e) {
+         throw new NodeManagerException(e);
       }
    }
 
@@ -167,182 +169,218 @@ public class FileLockNodeManager extends NodeManager {
    }
 
    @Override
-   public final void releaseBackup() throws Exception {
-      if (backupLock != null) {
-         backupLock.release();
-         backupLock = null;
+   public final void releaseBackup() throws NodeManagerException {
+      try {
+         if (backupLock != null) {
+            backupLock.release();
+            backupLock = null;
+         }
+      } catch (IOException e) {
+         throw new NodeManagerException(e);
       }
    }
 
    @Override
-   public void awaitLiveNode() throws Exception {
-      logger.debug("awaiting live node...");
-      do {
-         byte state = getState();
-         while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
-            logger.debug("awaiting live node startup state='" + state + "'");
-            Thread.sleep(2000);
-            state = getState();
-         }
+   public void awaitLiveNode() throws NodeManagerException, InterruptedException {
+      try {
+         logger.debug("awaiting live node...");
+         do {
+            byte state = getState();
+            while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
+               logger.debug("awaiting live node startup state='" + state + "'");
+               Thread.sleep(2000);
+               state = getState();
+            }
 
-         liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
-         if (interrupted) {
-            interrupted = false;
-            throw new InterruptedException("Lock was interrupted");
-         }
-         state = getState();
-         if (state == FileLockNodeManager.PAUSED) {
-            liveLock.release();
-            logger.debug("awaiting live node restarting");
-            Thread.sleep(2000);
-         } else if (state == FileLockNodeManager.FAILINGBACK) {
-            liveLock.release();
-            logger.debug("awaiting live node failing back");
-            Thread.sleep(2000);
-         } else if (state == FileLockNodeManager.LIVE) {
-            logger.debug("acquired live node lock state = " + (char) state);
-            break;
+            liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
+            if (interrupted) {
+               interrupted = false;
+               throw new InterruptedException("Lock was interrupted");
+            }
+            state = getState();
+            if (state == FileLockNodeManager.PAUSED) {
+               liveLock.release();
+               logger.debug("awaiting live node restarting");
+               Thread.sleep(2000);
+            } else if (state == FileLockNodeManager.FAILINGBACK) {
+               liveLock.release();
+               logger.debug("awaiting live node failing back");
+               Thread.sleep(2000);
+            } else if (state == FileLockNodeManager.LIVE) {
+               logger.debug("acquired live node lock state = " + (char) state);
+               break;
+            }
          }
+         while (true);
+      } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
+         throw new NodeManagerException(e);
       }
-      while (true);
    }
 
    @Override
-   public void startBackup() throws Exception {
+   public void startBackup() throws NodeManagerException {
       assert !replicatedBackup; // should not be called if this is a replicating backup
       ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
-
-      backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS);
+      try {
+         backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS);
+      } catch (ActiveMQLockAcquisitionTimeoutException e) {
+         throw new NodeManagerException(e);
+      }
       ActiveMQServerLogger.LOGGER.gotBackupLock();
       if (getUUID() == null)
          readNodeId();
    }
 
    @Override
-   public ActivateCallback startLiveNode() throws Exception {
-      setFailingBack();
+   public ActivateCallback startLiveNode() throws NodeManagerException {
+      try {
+         setFailingBack();
 
-      String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds";
+         String timeoutMessage = lockAcquisitionTimeoutNanos == -1 ? "indefinitely" : TimeUnit.NANOSECONDS.toMillis(lockAcquisitionTimeoutNanos) + " milliseconds";
 
-      ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
+         ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
 
-      liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
+         liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
 
-      ActiveMQServerLogger.LOGGER.obtainedLiveLock();
+         ActiveMQServerLogger.LOGGER.obtainedLiveLock();
 
-      return new CleaningActivateCallback() {
-         @Override
-         public void activationComplete() {
-            try {
-               setLive();
-               startLockMonitoring();
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         return new CleaningActivateCallback() {
+            @Override
+            public void activationComplete() {
+               try {
+                  setLive();
+                  startLockMonitoring();
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                  // that allows to restart/stop the broker if needed
+                  throw e;
+               }
             }
-         }
-      };
+         };
+      } catch (ActiveMQLockAcquisitionTimeoutException e) {
+         throw new NodeManagerException(e);
+      }
    }
 
    @Override
-   public void pauseLiveServer() throws Exception {
+   public void pauseLiveServer() throws NodeManagerException {
       stopLockMonitoring();
       setPaused();
-      if (liveLock != null) {
-         liveLock.release();
+      try {
+         if (liveLock != null) {
+            liveLock.release();
+         }
+      } catch (IOException e) {
+         throw new NodeManagerException(e);
       }
    }
 
    @Override
-   public void crashLiveServer() throws Exception {
+   public void crashLiveServer() throws NodeManagerException {
       stopLockMonitoring();
       if (liveLock != null) {
-         liveLock.release();
-         liveLock = null;
+         try {
+            liveLock.release();
+         } catch (IOException e) {
+            throw new NodeManagerException(e);
+         } finally {
+            liveLock = null;
+         }
       }
    }
 
    @Override
-   public void awaitLiveStatus() throws Exception {
+   public void awaitLiveStatus() throws NodeManagerException, InterruptedException {
       while (getState() != LIVE) {
          Thread.sleep(2000);
       }
    }
 
-   private void setLive() throws Exception {
+   private void setLive() throws NodeManagerException {
       writeFileLockStatus(FileLockNodeManager.LIVE);
    }
 
-   private void setFailingBack() throws Exception {
+   private void setFailingBack() throws NodeManagerException {
       writeFileLockStatus(FAILINGBACK);
    }
 
-   private void setPaused() throws Exception {
+   private void setPaused() throws NodeManagerException {
       writeFileLockStatus(PAUSED);
    }
 
    /**
     * @param status
-    * @throws IOException
+    * @throws ActiveMQLockAcquisitionTimeoutException,IOException
     */
-   private void writeFileLockStatus(byte status) throws Exception {
+   private void writeFileLockStatus(byte status) throws NodeManagerException {
       if (replicatedBackup && channel == null)
          return;
       logger.debug("writing status: " + status);
       ByteBuffer bb = ByteBuffer.allocateDirect(1);
       bb.put(status);
       bb.position(0);
-      if (!channel.isOpen()) {
-         setUpServerLockFile();
-      }
-      FileLock lock = null;
       try {
-         lock = lock(STATE_LOCK_POS);
-         channel.write(bb, 0);
-         channel.force(true);
-      } finally {
-         if (lock != null) {
-            lock.release();
+         if (!channel.isOpen()) {
+            setUpServerLockFile();
          }
+         FileLock lock = null;
+         try {
+            lock = lock(STATE_LOCK_POS);
+            channel.write(bb, 0);
+            channel.force(true);
+         } finally {
+            if (lock != null) {
+               lock.release();
+            }
+         }
+      } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
+         throw new NodeManagerException(e);
       }
    }
 
-   private byte getState() throws Exception {
-      byte result;
-      logger.debug("getting state...");
-      ByteBuffer bb = ByteBuffer.allocateDirect(1);
-      int read;
-      FileLock lock = null;
+   private byte getState() throws NodeManagerException {
       try {
-         lock = lock(STATE_LOCK_POS);
-         read = channel.read(bb, 0);
-         if (read <= 0) {
-            result = FileLockNodeManager.NOT_STARTED;
-         } else {
-            result = bb.get(0);
-         }
-      } finally {
-         if (lock != null) {
-            lock.release();
+         byte result;
+         logger.debug("getting state...");
+         ByteBuffer bb = ByteBuffer.allocateDirect(1);
+         int read;
+         FileLock lock = null;
+         try {
+            lock = lock(STATE_LOCK_POS);
+            read = channel.read(bb, 0);
+            if (read <= 0) {
+               result = FileLockNodeManager.NOT_STARTED;
+            } else {
+               result = bb.get(0);
+            }
+         } finally {
+            if (lock != null) {
+               lock.release();
+            }
          }
+         logger.debug("state: " + result);
+         return result;
+      } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
+         throw new NodeManagerException(e);
       }
-
-      logger.debug("state: " + result);
-
-      return result;
    }
 
    @Override
-   public final SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException {
-      ByteBuffer id = ByteBuffer.allocateDirect(16);
-      int read = channel.read(id, 3);
-      if (read != 16) {
-         throw new ActiveMQIllegalStateException("live server did not write id to file");
+   public final SimpleString readNodeId() throws NodeManagerException {
+      try {
+         ByteBuffer id = ByteBuffer.allocateDirect(16);
+         int read = channel.read(id, 3);
+         if (read != 16) {
+            throw new IOException("live server did not write id to file");
+         }
+         byte[] bytes = new byte[16];
+         id.position(0);
+         id.get(bytes);
+         setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
+         return getNodeId();
+      } catch (IOException e) {
+         throw new NodeManagerException(e);
       }
-      byte[] bytes = new byte[16];
-      id.position(0);
-      id.get(bytes);
-      setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
-      return getNodeId();
    }
 
    protected FileLock tryLock(final int lockPos) throws IOException {
@@ -361,8 +399,8 @@ public class FileLockNodeManager extends NodeManager {
       }
    }
 
-   protected FileLock lock(final int lockPosition) throws Exception {
-      long start = System.currentTimeMillis();
+   protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTimeoutException {
+      long start = System.nanoTime();
       boolean isRecurringFailure = false;
 
       while (!interrupted) {
@@ -377,7 +415,7 @@ public class FileLockNodeManager extends NodeManager {
                   return null;
                }
 
-               if (lockAcquisitionTimeout != -1 && (System.currentTimeMillis() - start) > lockAcquisitionTimeout) {
+               if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
                   throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
                }
             } else {
@@ -390,9 +428,9 @@ public class FileLockNodeManager extends NodeManager {
                     "Failure when accessing a lock file", e);
             isRecurringFailure = true;
 
-            long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME;
-            if (lockAcquisitionTimeout != -1) {
-               final long remainingTime = lockAcquisitionTimeout - (System.currentTimeMillis() - start);
+            long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS;
+            if (this.lockAcquisitionTimeoutNanos != -1) {
+               final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start);
                if (remainingTime <= 0) {
                   throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
                }
@@ -400,7 +438,7 @@ public class FileLockNodeManager extends NodeManager {
             }
 
             try {
-               Thread.sleep(waitTime);
+               TimeUnit.NANOSECONDS.sleep(waitTime);
             } catch (InterruptedException interrupt) {
                return null;
             }
@@ -414,7 +452,7 @@ public class FileLockNodeManager extends NodeManager {
    private synchronized void startLockMonitoring() {
       logger.debug("Starting the lock monitor");
       if (monitorLock == null) {
-         monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_MILLIES, LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS, false);
+         monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_NANOS, LOCK_MONITOR_TIMEOUT_NANOS, TimeUnit.NANOSECONDS, false);
          monitorLock.start();
       } else {
          logger.debug("Lock monitor was already started");
@@ -431,49 +469,22 @@ public class FileLockNodeManager extends NodeManager {
       }
    }
 
-   private void notifyLostLock() {
-      // Additional check we are not initializing or have no locking object anymore
-      // because of a shutdown
-      if (lockListeners != null && liveLock != null) {
-         Set<LockListener> lockListenersSnapshot = null;
-
-         // Snapshot of the set because I'm not sure if we can trigger concurrent
-         // modification exception here if we don't
-         synchronized (lockListeners) {
-            lockListenersSnapshot = new HashSet<>(lockListeners);
-         }
-
-         lockListenersSnapshot.forEach(lockListener -> {
-            try {
-               lockListener.lostLock();
-            } catch (Exception e) {
-               // Need to notify everyone so ignore any exception
-            }
-         });
+   @Override
+   protected synchronized void notifyLostLock() {
+      if (liveLock != null) {
+         super.notifyLostLock();
       }
    }
 
-   public void registerLockListener(LockListener lockListener) {
-      lockListeners.add(lockListener);
+   // This has been introduced to help ByteMan test testLockMonitorInvalid on JDK 11: sun.nio.ch.FileLockImpl::isValid
+   // can affecting setLive, causing an infinite loop due to java.nio.channels.OverlappingFileLockException on tryLock
+   private boolean isLiveLockLost() {
+      final FileLock lock = this.liveLock;
+      return (lock != null && !lock.isValid()) || lock == null;
    }
 
-   public void unregisterLockListener(LockListener lockListener) {
-      lockListeners.remove(lockListener);
-   }
-
-   protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>());
-
    private MonitorLock monitorLock;
 
-   public abstract class LockListener {
-      protected abstract void lostLock() throws Exception;
-
-      protected void unregisterListener() {
-         lockListeners.remove(this);
-      }
-   }
-
-
    public class MonitorLock extends ActiveMQScheduledComponent {
       public MonitorLock(ScheduledExecutorService scheduledExecutorService,
                             long initialDelay,
@@ -492,7 +503,7 @@ public class FileLockNodeManager extends NodeManager {
             if (liveLock == null) {
                logger.debug("Livelock is null");
             }
-            lostLock = (liveLock != null && !liveLock.isValid()) || liveLock == null;
+            lostLock = isLiveLockLost();
             if (!lostLock) {
                logger.debug("Server still has the lock, double check status is live");
                // Java always thinks the lock is still valid even when there is no filesystem
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
index 7af25de..bbe4191 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
@@ -17,14 +17,10 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.concurrent.Semaphore;
 
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActivateCallback;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 
 import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State.FAILING_BACK;
@@ -39,7 +35,7 @@ import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State
  * multiple servers are run inside the same VM and File Locks can not be shared in the
  * same VM (it would cause a shared lock violation).
  */
-public final class InVMNodeManager extends NodeManager {
+public final class InVMNodeManager extends FileBasedNodeManager {
 
    private final Semaphore liveLock;
 
@@ -67,7 +63,7 @@ public final class InVMNodeManager extends NodeManager {
    }
 
    @Override
-   public void awaitLiveNode() throws Exception {
+   public void awaitLiveNode() throws InterruptedException {
       do {
          while (state == NOT_STARTED) {
             Thread.sleep(10);
@@ -92,51 +88,47 @@ public final class InVMNodeManager extends NodeManager {
    }
 
    @Override
-   public void awaitLiveStatus() throws Exception {
+   public void awaitLiveStatus() throws InterruptedException {
       while (state != LIVE) {
          Thread.sleep(10);
       }
    }
 
    @Override
-   public void startBackup() throws Exception {
+   public void startBackup() throws InterruptedException {
       backupLock.acquire();
    }
 
    @Override
-   public ActivateCallback startLiveNode() throws Exception {
+   public ActivateCallback startLiveNode() throws InterruptedException {
       state = FAILING_BACK;
       liveLock.acquire();
       return new CleaningActivateCallback() {
          @Override
          public void activationComplete() {
-            try {
-               state = LIVE;
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-            }
+            state = LIVE;
          }
       };
    }
 
    @Override
-   public void pauseLiveServer() throws Exception {
+   public void pauseLiveServer() {
       state = PAUSED;
       liveLock.release();
    }
 
    @Override
-   public void crashLiveServer() throws Exception {
+   public void crashLiveServer() {
       liveLock.release();
    }
 
    @Override
-   public boolean isAwaitingFailback() throws Exception {
+   public boolean isAwaitingFailback() {
       return state == FAILING_BACK;
    }
 
    @Override
-   public boolean isBackupLive() throws Exception {
+   public boolean isBackupLive() {
       return liveLock.availablePermits() == 0;
    }
 
@@ -151,7 +143,7 @@ public final class InVMNodeManager extends NodeManager {
    }
 
    @Override
-   public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException {
+   public SimpleString readNodeId() {
       return getNodeId();
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 38483d0..002414a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -67,7 +68,7 @@ public final class SharedNothingBackupActivation extends Activation {
    private SharedNothingBackupQuorum backupQuorum;
    private final boolean attemptFailBack;
    private final Map<String, Object> activationParams;
-   private final ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO;
+   private final IOCriticalErrorListener ioCriticalErrorListener;
    private String nodeID;
    ClusterControl clusterControl;
    private boolean closed;
@@ -79,13 +80,13 @@ public final class SharedNothingBackupActivation extends Activation {
    public SharedNothingBackupActivation(ActiveMQServerImpl activeMQServer,
                                         boolean attemptFailBack,
                                         Map<String, Object> activationParams,
-                                        ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO,
+                                        IOCriticalErrorListener ioCriticalErrorListener,
                                         ReplicaPolicy replicaPolicy,
                                         NetworkHealthCheck networkHealthCheck) {
       this.activeMQServer = activeMQServer;
       this.attemptFailBack = attemptFailBack;
       this.activationParams = activationParams;
-      this.shutdownOnCriticalIO = shutdownOnCriticalIO;
+      this.ioCriticalErrorListener = ioCriticalErrorListener;
       this.replicaPolicy = replicaPolicy;
       backupSyncLatch.setCount(1);
       this.networkHealthCheck = networkHealthCheck;
@@ -95,7 +96,7 @@ public final class SharedNothingBackupActivation extends Activation {
       assert replicationEndpoint == null;
       activeMQServer.resetNodeManager();
       backupUpToDate = false;
-      replicationEndpoint = new ReplicationEndpoint(activeMQServer, shutdownOnCriticalIO, attemptFailBack, this);
+      replicationEndpoint = new ReplicationEndpoint(activeMQServer, ioCriticalErrorListener, attemptFailBack, this);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 7178cd8..9de4be0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -317,7 +317,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
       SimpleString nodeId0;
       try {
          nodeId0 = activeMQServer.getNodeManager().readNodeId();
-      } catch (ActiveMQIllegalStateException e) {
+      } catch (NodeManager.NodeManagerException e) {
          nodeId0 = null;
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
index 463d97c..0b21445 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
@@ -16,18 +16,24 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
+import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
 import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
@@ -40,17 +46,24 @@ public final class SharedStoreBackupActivation extends Activation {
    private static final Logger logger = Logger.getLogger(SharedStoreBackupActivation.class);
 
    //this is how we act as a backup
-   private SharedStoreSlavePolicy sharedStoreSlavePolicy;
+   private final SharedStoreSlavePolicy sharedStoreSlavePolicy;
 
-   private ActiveMQServerImpl activeMQServer;
+   private final ActiveMQServerImpl activeMQServer;
 
    private final Object failbackCheckerGuard = new Object();
 
    private boolean cancelFailBackChecker;
 
-   public SharedStoreBackupActivation(ActiveMQServerImpl server, SharedStoreSlavePolicy sharedStoreSlavePolicy) {
+   private LockListener activeLockListener;
+
+   private final IOCriticalErrorListener ioCriticalErrorListener;
+
+   public SharedStoreBackupActivation(ActiveMQServerImpl server,
+                                      SharedStoreSlavePolicy sharedStoreSlavePolicy,
+                                      IOCriticalErrorListener ioCriticalErrorListener) {
       this.activeMQServer = server;
       this.sharedStoreSlavePolicy = sharedStoreSlavePolicy;
+      this.ioCriticalErrorListener = ioCriticalErrorListener;
       synchronized (failbackCheckerGuard) {
          cancelFailBackChecker = false;
       }
@@ -59,6 +72,8 @@ public final class SharedStoreBackupActivation extends Activation {
    @Override
    public void run() {
       try {
+         registerActiveLockListener(activeMQServer.getNodeManager());
+
          activeMQServer.getNodeManager().startBackup();
 
          ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
@@ -92,6 +107,11 @@ public final class SharedStoreBackupActivation extends Activation {
          activeMQServer.completeActivation(false);
 
          if (scalingDown) {
+            if (!restarting.compareAndSet(false, true)) {
+               return;
+            }
+            unregisterActiveLockListener(activeMQServer.getNodeManager());
+
             ActiveMQServerLogger.LOGGER.backupServerScaledDown();
             Thread t = new Thread(new Runnable() {
                @Override
@@ -117,6 +137,17 @@ public final class SharedStoreBackupActivation extends Activation {
          if (sharedStoreSlavePolicy.isAllowAutoFailBack() && ActiveMQServerImpl.SERVER_STATE.STOPPING != activeMQServer.getState() && ActiveMQServerImpl.SERVER_STATE.STOPPED != activeMQServer.getState()) {
             startFailbackChecker();
          }
+      } catch (NodeManagerException nodeManagerException) {
+         if (nodeManagerException.getCause() instanceof ClosedChannelException) {
+            // this is ok, we are being stopped
+            return;
+         }
+         if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
+            ActiveMQServerLogger.LOGGER.initializationError(nodeManagerException.getCause());
+            return;
+         }
+         unregisterActiveLockListener(activeMQServer.getNodeManager());
+         ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
       } catch (ClosedChannelException | InterruptedException e) {
          // these are ok, we are being stopped
       } catch (Exception e) {
@@ -144,16 +175,25 @@ public final class SharedStoreBackupActivation extends Activation {
          activeMQServer.interruptActivationThread(nodeManagerInUse);
 
          if (nodeManagerInUse != null) {
+            unregisterActiveLockListener(nodeManagerInUse);
             nodeManagerInUse.stopBackup();
          }
       } else {
 
          if (nodeManagerInUse != null) {
+            unregisterActiveLockListener(nodeManagerInUse);
             // if we are now live, behave as live
             // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
             // started before the live
             if (sharedStoreSlavePolicy.isFailoverOnServerShutdown() || permanently) {
-               nodeManagerInUse.crashLiveServer();
+               try {
+                  nodeManagerInUse.crashLiveServer();
+               } catch (Throwable t) {
+                  if (!permanently) {
+                     throw t;
+                  }
+                  logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
+               }
             } else {
                nodeManagerInUse.pauseLiveServer();
             }
@@ -161,6 +201,27 @@ public final class SharedStoreBackupActivation extends Activation {
       }
    }
 
+   private void registerActiveLockListener(NodeManager nodeManager) {
+      LockListener lockListener = () -> {
+         if (!restarting.compareAndSet(false, true)) {
+            logger.warn("Restarting already happening on lost lock");
+            return;
+         }
+         unregisterActiveLockListener(nodeManager);
+         ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
+      };
+      activeLockListener = lockListener;
+      nodeManager.registerLockListener(lockListener);
+   }
+
+   private void unregisterActiveLockListener(NodeManager nodeManager) {
+      LockListener activeLockListener = this.activeLockListener;
+      if (activeLockListener != null) {
+         nodeManager.unregisterLockListener(activeLockListener);
+         this.activeLockListener = null;
+      }
+   }
+
    @Override
    public JournalLoader createJournalLoader(PostOffice postOffice,
                                             PagingManager pagingManager,
@@ -178,6 +239,8 @@ public final class SharedStoreBackupActivation extends Activation {
       }
    }
 
+   private final AtomicBoolean restarting = new AtomicBoolean(false);
+
    /**
     * To be called by backup trying to fail back the server
     */
@@ -195,47 +258,50 @@ public final class SharedStoreBackupActivation extends Activation {
          activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener);
       }
 
-      private boolean restarting = false;
-
       @Override
       public void run() {
          try {
-            if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) {
-               if (backupListener.waitForBackup()) {
-                  ActiveMQServerLogger.LOGGER.awaitFailBack();
-                  restarting = true;
-                  Thread t = new Thread(new Runnable() {
-                     @Override
-                     public void run() {
+            if (!restarting.get() && activeMQServer.getNodeManager().isAwaitingFailback() && backupListener.waitForBackup()) {
+               if (!restarting.compareAndSet(false, true)) {
+                  return;
+               }
+               ActiveMQServerLogger.LOGGER.awaitFailBack();
+               Thread t = new Thread(new Runnable() {
+                  @Override
+                  public void run() {
+                     try {
+                        logger.debug(activeMQServer + "::Stopping live node in favor of failback");
+
+                        NodeManager nodeManager = activeMQServer.getNodeManager();
+                        activeMQServer.stop(true, false, true);
+
+                        // ensure that the server to which we are failing back actually starts fully before we restart
+                        nodeManager.start();
                         try {
-                           logger.debug(activeMQServer + "::Stopping live node in favor of failback");
-
-                           NodeManager nodeManager = activeMQServer.getNodeManager();
-                           activeMQServer.stop(true, false, true);
+                           nodeManager.awaitLiveStatus();
+                        } finally {
+                           nodeManager.stop();
+                        }
 
-                           // ensure that the server to which we are failing back actually starts fully before we restart
-                           nodeManager.start();
-                           try {
-                              nodeManager.awaitLiveStatus();
-                           } finally {
-                              nodeManager.stop();
-                           }
+                        synchronized (failbackCheckerGuard) {
+                           if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
+                              return;
 
-                           synchronized (failbackCheckerGuard) {
-                              if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
-                                 return;
+                           activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
+                           logger.debug(activeMQServer + "::Starting backup node now after failback");
+                           activeMQServer.start();
 
-                              activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
-                              logger.debug(activeMQServer + "::Starting backup node now after failback");
-                              activeMQServer.start();
+                           LockListener lockListener = activeLockListener;
+                           if (lockListener != null) {
+                              activeMQServer.getNodeManager().registerLockListener(lockListener);
                            }
-                        } catch (Exception e) {
-                           ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
                         }
+                     } catch (Exception e) {
+                        ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
                      }
-                  });
-                  t.start();
-               }
+                  }
+               });
+               t.start();
             }
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
index 6745ed9..1733f34 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
@@ -16,11 +16,17 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
+import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
 import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
 import org.jboss.logging.Logger;
 
 public final class SharedStoreLiveActivation extends LiveActivation {
@@ -28,17 +34,22 @@ public final class SharedStoreLiveActivation extends LiveActivation {
    private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class);
 
    // this is how we act when we initially start as live
-   private SharedStoreMasterPolicy sharedStoreMasterPolicy;
+   private final SharedStoreMasterPolicy sharedStoreMasterPolicy;
 
-   private ActiveMQServerImpl activeMQServer;
+   private final ActiveMQServerImpl activeMQServer;
 
-   private volatile FileLockNodeManager.LockListener activeLockListener;
+   private volatile LockListener activeLockListener;
 
    private volatile ActivateCallback nodeManagerActivateCallback;
 
-   public SharedStoreLiveActivation(ActiveMQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy) {
+   private final IOCriticalErrorListener ioCriticalErrorListener;
+
+   public SharedStoreLiveActivation(ActiveMQServerImpl server,
+                                    SharedStoreMasterPolicy sharedStoreMasterPolicy,
+                                    IOCriticalErrorListener ioCriticalErrorListener) {
       this.activeMQServer = server;
       this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
+      this.ioCriticalErrorListener = ioCriticalErrorListener;
    }
 
    @Override
@@ -71,9 +82,9 @@ public final class SharedStoreLiveActivation extends LiveActivation {
             activeMQServer.getBackupManager().announceBackup();
          }
 
+         registerActiveLockListener(activeMQServer.getNodeManager());
          nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode();
          activeMQServer.registerActivateCallback(nodeManagerActivateCallback);
-         addLockListener(activeMQServer, activeMQServer.getNodeManager());
 
          if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
                || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
@@ -85,59 +96,40 @@ public final class SharedStoreLiveActivation extends LiveActivation {
          activeMQServer.completeActivation(false);
 
          ActiveMQServerLogger.LOGGER.serverIsLive();
+      } catch (NodeManagerException nodeManagerException) {
+         if (nodeManagerException.getCause() instanceof ClosedChannelException) {
+            // this is ok, we are being stopped
+            return;
+         }
+         if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
+            onActivationFailure((ActiveMQLockAcquisitionTimeoutException) nodeManagerException.getCause());
+            return;
+         }
+         unregisterActiveLockListener(activeMQServer.getNodeManager());
+         ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
       } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.initializationError(e);
-         activeMQServer.callActivationFailureListeners(e);
+         onActivationFailure(e);
       }
    }
 
-   private void addLockListener(ActiveMQServerImpl activeMQServer, NodeManager nodeManager) {
-      if (nodeManager instanceof FileLockNodeManager) {
-         FileLockNodeManager fileNodeManager = (FileLockNodeManager) nodeManager;
-
-         activeLockListener = fileNodeManager.new LockListener() {
-
-            @Override
-            public void lostLock() {
-               stopStartServerInSeperateThread(activeMQServer);
-            }
-
-         };
-         fileNodeManager.registerLockListener(activeLockListener);
-      } // else no business registering a listener
+   private void onActivationFailure(Exception e) {
+      unregisterActiveLockListener(activeMQServer.getNodeManager());
+      ActiveMQServerLogger.LOGGER.initializationError(e);
+      activeMQServer.callActivationFailureListeners(e);
    }
 
-   /**
-    * We need to do this in a new thread because this takes to long to finish in
-    * the scheduled thread Also this is not the responsibility of the scheduled
-    * thread
-    * @param activeMQServer
-    */
-   private void stopStartServerInSeperateThread(ActiveMQServerImpl activeMQServer) {
-      try {
-
-         Runnable startServerRunnable = new Runnable() {
-
-            @Override
-            public void run() {
-               try {
-                  activeMQServer.stop(true, false);
-               } catch (Exception e) {
-                  logger.warn("Failed to stop artemis server after loosing the lock", e);
-               }
-
-               try {
-                  activeMQServer.start();
-               } catch (Exception e) {
-                  logger.error("Failed to start artemis server after recovering from loosing the lock", e);
-               }
-            }
+   private void registerActiveLockListener(NodeManager nodeManager) {
+      LockListener lockListener = () ->
+         ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
+      activeLockListener = lockListener;
+      nodeManager.registerLockListener(lockListener);
+   }
 
-         };
-         Thread startServer = new Thread(startServerRunnable);
-         startServer.start();
-      } catch (Exception e) {
-         logger.error(e.getMessage());
+   private void unregisterActiveLockListener(NodeManager nodeManager) {
+      LockListener activeLockListener = this.activeLockListener;
+      if (activeLockListener != null) {
+         nodeManager.unregisterLockListener(activeLockListener);
+         this.activeLockListener = null;
       }
    }
 
@@ -147,16 +139,20 @@ public final class SharedStoreLiveActivation extends LiveActivation {
       NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
 
       if (nodeManagerInUse != null) {
-         LockListener closeLockListener = activeLockListener;
-         if (closeLockListener != null) {
-            closeLockListener.unregisterListener();
-         }
+         unregisterActiveLockListener(nodeManagerInUse);
          ActivateCallback activateCallback = nodeManagerActivateCallback;
          if (activateCallback != null) {
             activeMQServer.unregisterActivateCallback(activateCallback);
          }
          if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) {
-            nodeManagerInUse.crashLiveServer();
+            try {
+               nodeManagerInUse.crashLiveServer();
+            } catch (Throwable t) {
+               if (!permanently) {
+                  throw t;
+               }
+               logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
+            }
          } else {
             nodeManagerInUse.pauseLiveServer();
          }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
index ce91d70..d1382cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
@@ -20,13 +20,13 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.jboss.logging.Logger;
 
 /**
- * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}.
+ * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, LockListener)}.
  */
 final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
 
@@ -36,14 +36,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
    private final LeaseLock lock;
    private long lastLockRenewStart;
    private final long renewPeriodMillis;
-   private final IOCriticalErrorListener ioCriticalErrorListener;
+   private final LockListener lockListener;
 
    ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
                               ArtemisExecutor executor,
                               String lockName,
                               LeaseLock lock,
                               long renewPeriodMillis,
-                              IOCriticalErrorListener ioCriticalErrorListener) {
+                              LockListener lockListener) {
       super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
       if (renewPeriodMillis >= lock.expirationMillis()) {
          throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
@@ -51,9 +51,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
       this.lockName = lockName;
       this.lock = lock;
       this.renewPeriodMillis = renewPeriodMillis;
-      //already expired start time
+      // already expired start time
       this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
-      this.ioCriticalErrorListener = ioCriticalErrorListener;
+      this.lockListener = lockListener;
+   }
+
+   @Override
+   public String lockName() {
+      return lockName;
    }
 
    @Override
@@ -84,37 +89,55 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
    }
 
    @Override
-   public void run() {
+   public synchronized void run() {
+      if (!isStarted()) {
+         return;
+      }
       final long lastRenewStart = this.lastLockRenewStart;
       final long renewStart = System.nanoTime();
+      boolean lockLost = true;
       try {
-         if (!this.lock.renew()) {
-            ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
-         }
+         lockLost = !this.lock.renew();
       } catch (Throwable t) {
-         ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null);
-         throw t;
+         LOGGER.warnf(t, "%s lock renew has failed", lockName);
+         if (lock.localExpirationTime() > 0) {
+            final long millisBeforeExpiration = (lock.localExpirationTime() - System.currentTimeMillis());
+            // there is enough time to retry to renew it?
+            if (millisBeforeExpiration >= this.renewPeriodMillis) {
+               lockLost = false;
+            }
+         }
+      }
+      // a failed attempt to renew is treated as a lost lock
+      if (lockLost) {
+         try {
+            lockListener.lostLock();
+         } catch (Throwable t) {
+            LOGGER.warnf(t, "Errored while notifying %s lock listener", lockName);
+         }
       }
       //logic to detect slowness of DB and/or the scheduled executor service
-      detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
+      detectAndReportRenewSlowness(lockName, lockLost, lastRenewStart,
+                                   renewStart, renewPeriodMillis, lock.expirationMillis());
       this.lastLockRenewStart = renewStart;
    }
 
    private static void detectAndReportRenewSlowness(String lockName,
+                                                    boolean lostLock,
                                                     long lastRenewStart,
                                                     long renewStart,
                                                     long expectedRenewPeriodMillis,
                                                     long expirationMillis) {
       final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
       if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
-         LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms");
+         LOGGER.errorf("%s lock %s renew tooks %d ms, while is supposed to take <%d ms", lockName, lostLock ? "failed" : "successful", elapsedMillisToRenew, expectedRenewPeriodMillis);
       }
       final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
       final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
       if (measuredRenewPeriodMillis - expirationMillis > 100) {
-         LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+         LOGGER.errorf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
       } else if (measuredRenewPeriodMillis - expectedRenewPeriodMillis > 100) {
-         LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+         LOGGER.warnf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
       }
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
index 3b7124e..f1c789c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
@@ -44,8 +44,10 @@ final class JdbcLeaseLock implements LeaseLock {
    private final String isLocked;
    private final String currentDateTime;
    private final long expirationMillis;
+   private final int queryTimeout;
    private boolean maybeAcquired;
    private final String lockName;
+   private long localExpirationTime;
 
    /**
     * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
@@ -59,6 +61,7 @@ final class JdbcLeaseLock implements LeaseLock {
                  String isLocked,
                  String currentDateTime,
                  long expirationMIllis,
+                 long queryTimeoutMillis,
                  String lockName) {
       if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
          throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
@@ -73,12 +76,30 @@ final class JdbcLeaseLock implements LeaseLock {
       this.maybeAcquired = false;
       this.connectionProvider = connectionProvider;
       this.lockName = lockName;
+      this.localExpirationTime = -1;
+      int expectedTimeout = (int) (queryTimeoutMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(queryTimeoutMillis) : -1);
+      if (queryTimeoutMillis >= 0) {
+         LOGGER.warn("queryTimeoutMillis is too low: it's suggested to configure a multi-seconds value. Disabling it because too low.");
+         expectedTimeout = -1;
+      }
+      this.queryTimeout = expectedTimeout;
+
    }
 
    public String holderId() {
       return holderId;
    }
 
+   /**
+    * Given that many DBMS won't support standard SQL queries to collect CURRENT_TIMESTAMP at milliseconds granularity,
+    * this value is stripped of the milliseconds part, making it less optimistic then the reality, if >= 0.<p>
+    * It's commonly used as an hard deadline for JDBC operations, hence is fine to not have a high precision.
+    */
+   @Override
+   public long localExpirationTime() {
+      return localExpirationTime;
+   }
+
    @Override
    public long expirationMillis() {
       return expirationMillis;
@@ -115,17 +136,24 @@ final class JdbcLeaseLock implements LeaseLock {
    }
 
    private long dbCurrentTimeMillis(Connection connection) throws SQLException {
-      final long start = System.nanoTime();
       try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
+         if (queryTimeout >= 0) {
+            currentDateTime.setQueryTimeout(queryTimeout);
+         }
+         final long startTime = stripMilliseconds(System.currentTimeMillis());
          try (ResultSet resultSet = currentDateTime.executeQuery()) {
             resultSet.next();
+            final long endTime = stripMilliseconds(System.currentTimeMillis());
             final Timestamp currentTimestamp = resultSet.getTimestamp(1);
-            final long elapsedTime = System.nanoTime() - start;
-            if (LOGGER.isDebugEnabled()) {
-               LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
-                       lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
+            final long currentTime = currentTimestamp.getTime();
+            final long currentTimeMillis = stripMilliseconds(currentTime);
+            if (currentTimeMillis < startTime) {
+               LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen AFTER %s on broker", lockName, holderId, currentTimestamp, new Timestamp(startTime));
             }
-            return currentTimestamp.getTime();
+            if (currentTimeMillis > endTime) {
+               LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen BEFORE %s on broker", lockName, holderId, currentTimestamp, new Timestamp(endTime));
+            }
+            return currentTime;
          }
       }
    }
@@ -138,7 +166,8 @@ final class JdbcLeaseLock implements LeaseLock {
          connection.setAutoCommit(false);
          try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
             final long now = dbCurrentTimeMillis(connection);
-            final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+            final long localExpirationTime = now + expirationMillis;
+            final Timestamp expirationTime = new Timestamp(localExpirationTime);
             if (LOGGER.isDebugEnabled()) {
                LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
                              lockName, holderId, expirationTime);
@@ -151,11 +180,13 @@ final class JdbcLeaseLock implements LeaseLock {
             final boolean renewed = updatedRows == 1;
             connection.commit();
             if (!renewed) {
+               this.localExpirationTime = -1;
                if (LOGGER.isDebugEnabled()) {
                   LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
                                 lockName, holderId, readableLockStatus());
                }
             } else {
+               this.localExpirationTime = stripMilliseconds(localExpirationTime);
                LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
             }
             return renewed;
@@ -170,6 +201,10 @@ final class JdbcLeaseLock implements LeaseLock {
       }
    }
 
+   private static long stripMilliseconds(long time) {
+      return (time / 1000) * 1000;
+   }
+
    @Override
    public boolean tryAcquire() {
       try (Connection connection = connectionProvider.getConnection()) {
@@ -179,7 +214,8 @@ final class JdbcLeaseLock implements LeaseLock {
          try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) {
             final long now = dbCurrentTimeMillis(connection);
             preparedStatement.setString(1, holderId);
-            final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+            final long localExpirationTime = now + expirationMillis;
+            final Timestamp expirationTime = new Timestamp(localExpirationTime);
             preparedStatement.setTimestamp(2, expirationTime);
             preparedStatement.setTimestamp(3, expirationTime);
             LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
@@ -188,6 +224,7 @@ final class JdbcLeaseLock implements LeaseLock {
             connection.commit();
             if (acquired) {
                this.maybeAcquired = true;
+               this.localExpirationTime = stripMilliseconds(localExpirationTime);
                LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
             } else {
                if (LOGGER.isDebugEnabled()) {
@@ -272,6 +309,7 @@ final class JdbcLeaseLock implements LeaseLock {
             preparedStatement.setString(1, holderId);
             final boolean released = preparedStatement.executeUpdate() == 1;
             //consider it as released to avoid on finalize to be reclaimed
+            this.localExpirationTime = -1;
             this.maybeAcquired = false;
             connection.commit();
             if (!released) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
index 212e6e1..f357925 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -23,8 +23,8 @@ import java.util.function.Supplier;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
@@ -36,6 +36,8 @@ import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.jboss.logging.Logger;
 
+import static org.apache.activemq.artemis.core.server.impl.jdbc.LeaseLock.AcquireResult.Timeout;
+
 /**
  * JDBC implementation of {@link NodeManager}.
  */
@@ -53,12 +55,10 @@ public final class JdbcNodeManager extends NodeManager {
    private final long lockAcquisitionTimeoutMillis;
    private volatile boolean interrupted = false;
    private final LeaseLock.Pauser pauser;
-   private final IOCriticalErrorListener ioCriticalErrorListener;
 
    public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
                                       ScheduledExecutorService scheduledExecutorService,
-                                      ExecutorFactory executorFactory,
-                                      IOCriticalErrorListener ioCriticalErrorListener) {
+                                      ExecutorFactory executorFactory) {
       validateTimeoutConfiguration(configuration);
       final SQLProvider.Factory sqlProviderFactory;
       if (configuration.getSqlProviderFactory() != null) {
@@ -74,8 +74,7 @@ public final class JdbcNodeManager extends NodeManager {
                              configuration.getConnectionProvider(),
                              sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
                              scheduledExecutorService,
-                             executorFactory,
-                             ioCriticalErrorListener);
+                             executorFactory);
    }
 
    private static JdbcNodeManager usingConnectionProvider(String brokerId,
@@ -85,18 +84,16 @@ public final class JdbcNodeManager extends NodeManager {
                                                   JDBCConnectionProvider connectionProvider,
                                                   SQLProvider provider,
                                                   ScheduledExecutorService scheduledExecutorService,
-                                                  ExecutorFactory executorFactory,
-                                                  IOCriticalErrorListener ioCriticalErrorListener) {
-      return new JdbcNodeManager(
-         () -> JdbcSharedStateManager.usingConnectionProvider(brokerId,
-                                                      lockExpirationMillis,
-                                                      connectionProvider,
-                                                      provider),
+                                                  ExecutorFactory executorFactory) {
+      return new JdbcNodeManager(() -> JdbcSharedStateManager.usingConnectionProvider(brokerId, lockExpirationMillis,
+                                                                                      lockRenewPeriodMillis,
+                                                                                      connectionProvider,
+                                                                                      provider),
+         lockExpirationMillis,
          lockRenewPeriodMillis,
          lockAcquisitionTimeoutMillis,
          scheduledExecutorService,
-         executorFactory,
-         ioCriticalErrorListener);
+         executorFactory);
    }
 
    private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) {
@@ -122,12 +119,12 @@ public final class JdbcNodeManager extends NodeManager {
    }
 
    private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
+                           long lockExpirationMillis,
                            long lockRenewPeriodMillis,
                            long lockAcquisitionTimeoutMillis,
                            ScheduledExecutorService scheduledExecutorService,
-                           ExecutorFactory executorFactory,
-                           IOCriticalErrorListener ioCriticalErrorListener) {
-      super(false, null);
+                           ExecutorFactory executorFactory) {
+      super(false);
       this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
       this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
       this.sharedStateManagerFactory = sharedStateManagerFactory;
@@ -137,7 +134,7 @@ public final class JdbcNodeManager extends NodeManager {
          "live",
          this.sharedStateManager.liveLock(),
          lockRenewPeriodMillis,
-         ioCriticalErrorListener);
+         this::notifyLostLock);
       this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of(
          scheduledExecutorService,
          executorFactory != null ?
@@ -145,35 +142,47 @@ public final class JdbcNodeManager extends NodeManager {
          "backup",
          this.sharedStateManager.backupLock(),
          lockRenewPeriodMillis,
-         ioCriticalErrorListener);
-      this.ioCriticalErrorListener = ioCriticalErrorListener;
+         this::notifyLostLock);
       this.sharedStateManager = null;
       this.scheduledLiveLock = null;
       this.scheduledBackupLock = null;
    }
 
    @Override
-   public void start() throws Exception {
+   protected synchronized void notifyLostLock() {
       try {
-         synchronized (this) {
-            if (isStarted()) {
-               return;
-            }
-            this.sharedStateManager = sharedStateManagerFactory.get();
-            LOGGER.debug("setup sharedStateManager on start");
-            final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
-            setUUID(nodeId);
-            this.scheduledLiveLock = scheduledLiveLockFactory.get();
-            this.scheduledBackupLock = scheduledBackupLockFactory.get();
-            super.start();
+         super.notifyLostLock();
+      } finally {
+         // if any of the notified listener has stopped the node manager or
+         // the node manager was already stopped
+         if (!isStarted()) {
+            return;
+         }
+         try {
+            stop();
+         } catch (Exception ex) {
+            LOGGER.warn("Stopping node manager has errored on lost lock notification", ex);
+         }
+      }
+   }
+
+   @Override
+   public synchronized void start() throws Exception {
+      try {
+         if (isStarted()) {
+            return;
          }
+         this.sharedStateManager = sharedStateManagerFactory.get();
+         LOGGER.debug("setup sharedStateManager on start");
+         final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
+         setUUID(nodeId);
+         this.scheduledLiveLock = scheduledLiveLockFactory.get();
+         this.scheduledBackupLock = scheduledBackupLockFactory.get();
+         super.start();
       } catch (IllegalStateException e) {
          this.sharedStateManager = null;
          this.scheduledLiveLock = null;
          this.scheduledBackupLock = null;
-         if (this.ioCriticalErrorListener != null) {
-            this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
-         }
          throw e;
       }
    }
@@ -200,43 +209,34 @@ public final class JdbcNodeManager extends NodeManager {
    }
 
    @Override
-   public boolean isAwaitingFailback() throws Exception {
+   public boolean isAwaitingFailback() throws NodeManagerException {
+      checkStarted();
       LOGGER.debug("ENTER isAwaitingFailback");
       try {
          return readSharedState() == SharedStateManager.State.FAILING_BACK;
+      } catch (IllegalStateException e) {
+         LOGGER.warn("cannot retrieve the live state: assume it's not yet failed back", e);
+         return false;
       } finally {
          LOGGER.debug("EXIT isAwaitingFailback");
       }
    }
 
    @Override
-   public boolean isBackupLive() throws Exception {
+   public boolean isBackupLive() throws NodeManagerException {
+      checkStarted();
       LOGGER.debug("ENTER isBackupLive");
       try {
          //is anyone holding the live lock?
          return this.scheduledLiveLock.lock().isHeld();
+      } catch (IllegalStateException e) {
+         throw new NodeManagerException(e);
       } finally {
          LOGGER.debug("EXIT isBackupLive");
       }
    }
 
    @Override
-   public void stopBackup() throws Exception {
-      LOGGER.debug("ENTER stopBackup");
-      try {
-         if (this.scheduledBackupLock.isStarted()) {
-            LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
-            this.scheduledBackupLock.stop();
-            this.scheduledBackupLock.lock().release();
-         } else {
-            LOGGER.debug("scheduledBackupLock is not running");
-         }
-      } finally {
-         LOGGER.debug("EXIT stopBackup");
-      }
-   }
-
-   @Override
    public void interrupt() {
       LOGGER.debug("ENTER interrupted");
       //need to be volatile: must be called concurrently to work as expected
@@ -245,7 +245,8 @@ public final class JdbcNodeManager extends NodeManager {
    }
 
    @Override
-   public void releaseBackup() throws Exception {
+   public void releaseBackup() throws NodeManagerException {
+      checkStarted();
       LOGGER.debug("ENTER releaseBackup");
       try {
          if (this.scheduledBackupLock.isStarted()) {
@@ -255,19 +256,47 @@ public final class JdbcNodeManager extends NodeManager {
          } else {
             LOGGER.debug("scheduledBackupLock is not running");
          }
+      } catch (IllegalStateException e) {
+         throw new NodeManagerException(e);
       } finally {
          LOGGER.debug("EXIT releaseBackup");
       }
    }
 
    /**
-    * Try to acquire a lock, failing with an exception otherwise.
+    * Try to acquire a lock
     */
-   private void lock(LeaseLock lock) throws Exception {
-      final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted);
+   private void lock(LeaseLock lock) throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
+      final long lockAcquisitionTimeoutNanos = lockAcquisitionTimeoutMillis >= 0 ?
+         TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeoutMillis) : -1;
+      LeaseLock.AcquireResult acquireResult = null;
+      final long start = System.nanoTime();
+      while (acquireResult == null) {
+         checkStarted();
+         // measure distance from the timeout
+         final long remainingNanos = remainingNanos(start, lockAcquisitionTimeoutNanos);
+         if (remainingNanos == 0) {
+            acquireResult = Timeout;
+            continue;
+         }
+         final long remainingMillis = remainingNanos > 0 ? TimeUnit.NANOSECONDS.toMillis(remainingNanos) : -1;
+         try {
+            acquireResult = lock.tryAcquire(remainingMillis, this.pauser, () -> !this.interrupted);
+         } catch (IllegalStateException e) {
+            LOGGER.warn("Errored while trying to acquire lock", e);
+            if (remainingNanos(start, lockAcquisitionTimeoutNanos) == 0) {
+               acquireResult = Timeout;
+               continue;
+            }
+            // that's not precise, but it's ok: it can trigger the timeout right after the pause,
+            // depending by the pause length. The sole purpose of the pause is to save
+            // hammering with requests the DBMS if the connection is down
+            this.pauser.idle();
+         }
+      }
       switch (acquireResult) {
          case Timeout:
-            throw new Exception("timed out waiting for lock");
+            throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
          case Exit:
             this.interrupted = false;
             throw new InterruptedException("LeaseLock was interrupted");
@@ -279,6 +308,20 @@ public final class JdbcNodeManager extends NodeManager {
 
    }
 
+   private static long remainingNanos(long start, long timeoutNanos) {
+      if (timeoutNanos > 0) {
+         final long elapsedNanos = (System.nanoTime() - start);
+         if (elapsedNanos < timeoutNanos) {
+            return timeoutNanos - elapsedNanos;
+         } else {
+            return 0;
+         }
+      } else {
+         assert timeoutNanos == -1;
+         return -1;
+      }
+   }
+
    private void checkInterrupted(Supplier<String> message) throws InterruptedException {
       if (this.interrupted) {
          interrupted = false;
@@ -286,52 +329,77 @@ public final class JdbcNodeManager extends NodeManager {
       }
    }
 
-   private void renewLiveLockIfNeeded(final long acquiredOn) {
-      final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn);
-      if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
-         if (!this.scheduledLiveLock.lock().renew()) {
-            final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
-            ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
-            throw e;
+   private void renewLock(ScheduledLeaseLock lock) {
+      boolean lostLock = true;
+      IllegalStateException renewEx = null;
+      try {
+         lostLock = !this.scheduledLiveLock.lock().renew();
+      } catch (IllegalStateException e) {
+         renewEx = e;
+      }
+      if (lostLock) {
+         notifyLostLock();
+         if (renewEx == null) {
+            renewEx = new IllegalStateException(lock.lockName() + " lock isn't renewed");
          }
+         throw renewEx;
       }
    }
 
    /**
     * Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
     */
-   private boolean lockLiveAndCheckLiveState() throws Exception {
-      lock(this.scheduledLiveLock.lock());
-      final long acquiredOn = System.nanoTime();
-      boolean liveWhileLocked = false;
-      //check if the state is live
-      final SharedStateManager.State stateWhileLocked;
+   private boolean lockLiveAndCheckLiveState() throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
       try {
-         stateWhileLocked = readSharedState();
-      } catch (Throwable t) {
-         LOGGER.error("error while holding the live node lock and tried to read the shared state", t);
-         this.scheduledLiveLock.lock().release();
-         throw t;
-      }
-      if (stateWhileLocked == SharedStateManager.State.LIVE) {
-         renewLiveLockIfNeeded(acquiredOn);
-         liveWhileLocked = true;
-      } else {
-         LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked);
-         //state is not live: can (try to) release the lock
-         this.scheduledLiveLock.lock().release();
+         lock(this.scheduledLiveLock.lock());
+         //check if the state is live
+         while (true) {
+            try {
+               final SharedStateManager.State stateWhileLocked = readSharedState();
+               final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
+               if (System.currentTimeMillis() > localExpirationTime) {
+                  // the lock can be assumed to be expired,
+                  // so the state isn't worthy to be considered
+                  return false;
+               }
+               if (stateWhileLocked == SharedStateManager.State.LIVE) {
+                  // TODO need some tolerance//renew here?
+                  return true;
+               } else {
+                  // state is not live: can (try to) release the lock
+                  this.scheduledLiveLock.lock().release();
+                  return false;
+               }
+            } catch (IllegalStateException e) {
+               LOGGER.error("error while holding the live node lock and tried to read the shared state or to release the lock", e);
+               checkStarted();
+               checkInterrupted(() -> "interrupt on error while checking live state");
+               pauser.idle();
+               final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
+               if (System.currentTimeMillis() > localExpirationTime) {
+                  return false;
+               }
+            }
+         }
+      } catch (InterruptedException e) {
+         throw e;
       }
-      return liveWhileLocked;
    }
 
    @Override
-   public void awaitLiveNode() throws Exception {
+   public void awaitLiveNode() throws NodeManagerException, InterruptedException {
+      checkStarted();
       LOGGER.debug("ENTER awaitLiveNode");
       try {
          boolean liveWhileLocked = false;
          while (!liveWhileLocked) {
             //check first without holding any lock
-            final SharedStateManager.State state = readSharedState();
+            SharedStateManager.State state = null;
+            try {
+               state = readSharedState();
+            } catch (IllegalStateException e) {
+               LOGGER.warn("Errored while reading shared state", e);
+            }
             if (state == SharedStateManager.State.LIVE) {
                //verify if the state is live while holding the live node lock too
                liveWhileLocked = lockLiveAndCheckLiveState();
@@ -339,6 +407,7 @@ public final class JdbcNodeManager extends NodeManager {
                LOGGER.debugf("state while awaiting live node: %s", state);
             }
             if (!liveWhileLocked) {
+               checkStarted();
                checkInterrupted(() -> "awaitLiveNode got interrupted!");
                pauser.idle();
             }
@@ -346,32 +415,51 @@ public final class JdbcNodeManager extends NodeManager {
          //state is LIVE and live lock is acquired and valid
          LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE);
          this.scheduledLiveLock.start();
+      } catch (InterruptedException e) {
+         throw e;
+      } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
+         throw new NodeManagerException(e);
       } finally {
          LOGGER.debug("EXIT awaitLiveNode");
       }
    }
 
    @Override
-   public void startBackup() throws Exception {
+   public void startBackup() throws NodeManagerException, InterruptedException {
+      checkStarted();
       LOGGER.debug("ENTER startBackup");
       try {
          ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
-
          lock(scheduledBackupLock.lock());
          scheduledBackupLock.start();
          ActiveMQServerLogger.LOGGER.gotBackupLock();
          if (getUUID() == null)
             readNodeId();
+      } catch (InterruptedException ie) {
+         throw ie;
+      } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
+         throw new NodeManagerException(e);
       } finally {
          LOGGER.debug("EXIT startBackup");
       }
    }
 
    @Override
-   public ActivateCallback startLiveNode() throws Exception {
+   public ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException {
+      checkStarted();
       LOGGER.debug("ENTER startLiveNode");
       try {
-         setFailingBack();
+         boolean done = false;
+         while (!done) {
+            try {
+               setFailingBack();
+               done = true;
+            } catch (IllegalStateException e) {
+               LOGGER.warn("cannot set failing back state, retry", e);
+               pauser.idle();
+               checkInterrupted(() -> "interrupt while trying to set failing back state");
+            }
+         }
 
          final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
 
@@ -389,21 +477,42 @@ public final class JdbcNodeManager extends NodeManager {
                LOGGER.debug("ENTER activationComplete");
                try {
                   //state can be written only if the live renew task is running
-                  setLive();
-               } catch (Exception e) {
+                  boolean done = false;
+                  while (!done) {
+                     try {
+                        setLive();
+                        done = true;
+                     } catch (IllegalStateException e) {
+                        LOGGER.warn("Errored while trying to setLive", e);
+                        checkStarted();
+                        pauser.idle();
+                        final long localExpirationTime = scheduledLiveLock.lock().localExpirationTime();
+                        // optimistic: is just to set a deadline while retrying
+                        if (System.currentTimeMillis() > localExpirationTime) {
+                           throw new IllegalStateException("live lock is probably expired: failed to setLive");
+                        }
+                     }
+                  }
+               } catch (IllegalStateException e) {
                   ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                  throw new NodeManagerException(e);
                } finally {
                   LOGGER.debug("EXIT activationComplete");
                }
             }
          };
+      } catch (InterruptedException ie) {
+         throw ie;
+      } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
+         throw new NodeManagerException(e);
       } finally {
          LOGGER.debug("EXIT startLiveNode");
       }
    }
 
    @Override
-   public void pauseLiveServer() throws Exception {
+   public void pauseLiveServer() throws NodeManagerException {
+      checkStarted();
       LOGGER.debug("ENTER pauseLiveServer");
       try {
          if (scheduledLiveLock.isStarted()) {
@@ -413,23 +522,21 @@ public final class JdbcNodeManager extends NodeManager {
             scheduledLiveLock.lock().release();
          } else {
             LOGGER.debug("scheduledLiveLock is not running: try renew live lock");
-            if (scheduledLiveLock.lock().renew()) {
-               LOGGER.debug("live lock renewed: set paused shared state and release live lock");
-               setPaused();
-               scheduledLiveLock.lock().release();
-            } else {
-               final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
-               ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
-               throw e;
-            }
+            renewLock(scheduledLiveLock);
+            LOGGER.debug("live lock renewed: set paused shared state and release live lock");
+            setPaused();
+            scheduledLiveLock.lock().release();
          }
+      } catch (IllegalStateException e) {
+         throw new NodeManagerException(e);
       } finally {
          LOGGER.debug("EXIT pauseLiveServer");
       }
    }
 
    @Override
-   public void crashLiveServer() throws Exception {
+   public void crashLiveServer() throws NodeManagerException {
+      checkStarted();
       LOGGER.debug("ENTER crashLiveServer");
       try {
          if (this.scheduledLiveLock.isStarted()) {
@@ -446,10 +553,18 @@ public final class JdbcNodeManager extends NodeManager {
 
    @Override
    public void awaitLiveStatus() {
+      checkStarted();
       LOGGER.debug("ENTER awaitLiveStatus");
       try {
-         while (readSharedState() != SharedStateManager.State.LIVE) {
+         SharedStateManager.State state = null;
+         while (state != SharedStateManager.State.LIVE) {
+            try {
+               state = readSharedState();
+            } catch (IllegalStateException e) {
+               LOGGER.warn("Errored while trying to read shared state", e);
+            }
             pauser.idle();
+            checkStarted();
          }
       } finally {
          LOGGER.debug("EXIT awaitLiveStatus");
@@ -481,6 +596,7 @@ public final class JdbcNodeManager extends NodeManager {
 
    @Override
    public SimpleString readNodeId() {
+      checkStarted();
       final UUID nodeId = this.sharedStateManager.readNodeId();
       LOGGER.debugf("readNodeId nodeId = %s", nodeId);
       setUUID(nodeId);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
index 06f7e2a..df43c6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
@@ -39,6 +39,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
    private static final int MAX_SETUP_ATTEMPTS = 20;
    private final String holderId;
    private final long lockExpirationMillis;
+   private final long queryTimeoutMillis;
    private JdbcLeaseLock liveLock;
    private JdbcLeaseLock backupLock;
    private String readNodeId;
@@ -48,10 +49,19 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
    private String writeState;
 
    public static JdbcSharedStateManager usingConnectionProvider(String holderId,
-                                                        long locksExpirationMillis,
-                                                        JDBCConnectionProvider connectionProvider,
-                                                        SQLProvider provider) {
-      final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
+                                                                long locksExpirationMillis,
+                                                                JDBCConnectionProvider connectionProvider,
+                                                                SQLProvider provider) {
+      return usingConnectionProvider(holderId, locksExpirationMillis, -1, connectionProvider, provider);
+   }
+
+   public static JdbcSharedStateManager usingConnectionProvider(String holderId,
+                                                                long locksExpirationMillis,
+                                                                long queryTimeoutMillis,
+                                                                JDBCConnectionProvider connectionProvider,
+                                                                SQLProvider provider) {
+      final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis,
+                                                                                   queryTimeoutMillis);
       sharedStateManager.setJdbcConnectionProvider(connectionProvider);
       sharedStateManager.setSqlProvider(provider);
       try {
@@ -76,20 +86,35 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
                                        JDBCConnectionProvider connectionProvider,
                                        SQLProvider sqlProvider,
                                        long expirationMillis) {
-      return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE");
+      return createLiveLock(holderId, connectionProvider, sqlProvider, expirationMillis, -1);
+   }
+
+   static JdbcLeaseLock createLiveLock(String holderId,
+                                       JDBCConnectionProvider connectionProvider,
+                                       SQLProvider sqlProvider,
+                                       long expirationMillis,
+                                       long queryTimeoutMillis) {
+      return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(),
+                               sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(),
+                               sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(),
+                               expirationMillis, queryTimeoutMillis, "LIVE");
    }
 
    static JdbcLeaseLock createBackupLock(String holderId,
                                          JDBCConnectionProvider connectionProvider,
                                          SQLProvider sqlProvider,
-                                         long expirationMillis) {
-      return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP");
+                                         long expirationMillis,
+                                         long queryTimeoutMillis) {
+      return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(),
+                               sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(),
+                               sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(),
+                               expirationMillis, queryTimeoutMillis, "BACKUP");
    }
 
    @Override
    protected void prepareStatements() {
-      this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
-      this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
+      this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
+      this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
       this.readNodeId = sqlProvider.readNodeIdSQL();
       this.writeNodeId = sqlProvider.writeNodeIdSQL();
       this.initializeNodeId = sqlProvider.initializeNodeIdSQL();
@@ -97,9 +122,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
       this.readState = sqlProvider.readStateSQL();
    }
 
-   private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
+   private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long queryTimeoutMillis) {
       this.holderId = holderId;
       this.lockExpirationMillis = lockExpirationMillis;
+      this.queryTimeoutMillis = queryTimeoutMillis;
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
index 8deda12..8f2de11 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
@@ -41,6 +41,8 @@ interface LeaseLock extends AutoCloseable {
       boolean keepRunning();
    }
 
+   long localExpirationTime();
+
    interface Pauser {
 
       void idle();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
index 43751f8..8590fc6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
 
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 /**
@@ -28,17 +28,25 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
  */
 interface ScheduledLeaseLock extends ActiveMQComponent {
 
+   @Override
+   void start();
+
+   @Override
+   void stop();
+
    LeaseLock lock();
 
    long renewPeriodMillis();
 
+   String lockName();
+
    static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
                                 ArtemisExecutor executor,
                                 String lockName,
                                 LeaseLock lock,
                                 long renewPeriodMillis,
-                                IOCriticalErrorListener ioCriticalErrorListener) {
-      return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
+                                LockListener lockListener) {
+      return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, lockListener);
    }
 
 }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
index 77f488e..03febc2 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
@@ -21,15 +21,23 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -38,6 +46,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.core.Is.is;
+
 @RunWith(Parameterized.class)
 public class JdbcLeaseLockTest extends ActiveMQTestBase {
 
@@ -183,7 +194,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
 
    @Test
    public void shouldAcquireExpiredLock() throws InterruptedException {
-      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      final LeaseLock lock = lock(10);
       Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
       try {
          Thread.sleep(lock.expirationMillis() * 2);
@@ -197,13 +208,13 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
 
    @Test
    public void shouldOtherAcquireExpiredLock() throws InterruptedException {
-      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      final LeaseLock lock = lock(10);
       Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
       try {
          Thread.sleep(lock.expirationMillis() * 2);
          Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
          Assert.assertFalse("lock is already expired", lock.isHeld());
-         final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
+         final LeaseLock otherLock = lock(10);
          try {
             Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
          } finally {
@@ -237,7 +248,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
 
    @Test
    public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException {
-      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      final LeaseLock lock = lock(10);
       Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
       try {
          Thread.sleep(lock.expirationMillis() * 2);
@@ -251,7 +262,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
 
    @Test
    public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException {
-      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      final LeaseLock lock = lock(10);
       Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
       try {
          Thread.sleep(lock.expirationMillis() * 2);
@@ -268,5 +279,97 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
          lock.release();
       }
    }
+
+   @Test
+   public void shouldNotNotifyLostLock() throws Exception {
+      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+      final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
+      final ArtemisExecutor artemisExecutor = factory.getExecutor();
+      final AtomicLong lostLock = new AtomicLong();
+      final LockListener lockListener = () -> {
+         lostLock.incrementAndGet();
+      };
+      final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
+         .of(scheduledExecutorService, artemisExecutor,
+         "test", lock(), dbConf.getJdbcLockRenewPeriodMillis(), lockListener);
+
+      Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
+      scheduledLeaseLock.start();
+      Assert.assertEquals(0, lostLock.get());
+      scheduledLeaseLock.stop();
+      Assert.assertEquals(0, lostLock.get());
+      executorService.shutdown();
+      scheduledExecutorService.shutdown();
+      scheduledLeaseLock.lock().release();
+   }
+
+
+   @Test
+   public void shouldNotifyManyTimesLostLock() throws Exception {
+      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+      final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
+      final ArtemisExecutor artemisExecutor = factory.getExecutor();
+      final AtomicLong lostLock = new AtomicLong();
+      final LockListener lockListener = () -> {
+         lostLock.incrementAndGet();
+      };
+      final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
+         .of(scheduledExecutorService, artemisExecutor,
+             "test", lock(TimeUnit.SECONDS.toMillis(1)), 100, lockListener);
+
+      Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
+      scheduledLeaseLock.start();
+      // should let the renew to happen at least 1 time, excluding the time to start a scheduled task
+      TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
+      Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
+      Assert.assertEquals(0, lostLock.get());
+      scheduledLeaseLock.lock().release();
+      Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
+      TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
+      Assert.assertThat(lostLock.get(), is(greaterThanOrEqualTo(2L)));
+      scheduledLeaseLock.stop();
+      executorService.shutdown();
+      scheduledExecutorService.shutdown();
+   }
+
+   @Test
+   public void shouldNotifyOnceLostLockIfStopped() throws Exception {
+      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+      final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
+      final ArtemisExecutor artemisExecutor = factory.getExecutor();
+      final AtomicLong lostLock = new AtomicLong();
+      final AtomicReference<ScheduledLeaseLock> lock = new AtomicReference<>();
+      final AtomicReference<Throwable> stopErrors = new AtomicReference<>();
+      final LockListener lockListener = () -> {
+         lostLock.incrementAndGet();
+         try {
+            lock.get().stop();
+         } catch (Throwable e) {
+            stopErrors.set(e);
+         }
+      };
+      final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
+         .of(scheduledExecutorService, artemisExecutor, "test", lock(TimeUnit.SECONDS.toMillis(1)),
+             100, lockListener);
+      lock.set(scheduledLeaseLock);
+      Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
+      lostLock.set(0);
+      scheduledLeaseLock.start();
+      Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
+      scheduledLeaseLock.lock().release();
+      Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
+      Wait.assertTrue(() -> lostLock.get() > 0);
+      Assert.assertFalse(scheduledLeaseLock.isStarted());
+      // wait enough to see if it get triggered again
+      TimeUnit.MILLISECONDS.sleep(scheduledLeaseLock.renewPeriodMillis());
+      Assert.assertEquals(1, lostLock.getAndSet(0));
+      Assert.assertNull(stopErrors.getAndSet(null));
+      scheduledLeaseLock.stop();
+      executorService.shutdown();
+      scheduledExecutorService.shutdown();
+   }
 }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java
index 762e051..47e7660 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -105,17 +104,9 @@ public class JdbcNodeManagerTest extends ActiveMQTestBase {
 
    @Test
    public void shouldStartAndStopGracefullyTest() throws Exception {
-      final AtomicReference<String> criticalError = new AtomicReference<>();
-      final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null, (code, message, file) -> criticalError.lazySet(message));
-      try {
-         nodeManager.start();
-      } finally {
-         nodeManager.stop();
-         final String error = criticalError.get();
-         if (error != null) {
-            Assert.fail(error);
-         }
-      }
+      final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null);
+      nodeManager.start();
+      nodeManager.stop();
    }
 
 }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index be6d242..3f5436a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -522,6 +522,7 @@ public abstract class ActiveMQTestBase extends Assert {
       dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
       dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
       dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
+      dbStorageConfiguration.setJdbcNetworkTimeout(-1);
       return dbStorageConfiguration;
    }
 
@@ -530,11 +531,11 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected long getJdbcLockExpirationMillis() {
-      return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis());
+      return Long.getLong("jdbc.lock.expiration", 4_000);
    }
 
    protected long getJdbcLockRenewPeriodMillis() {
-      return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
+      return Long.getLong("jdbc.lock.renew", 200);
    }
 
    public void destroyTables(List<String> tableNames) throws Exception {
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java
index 548ec1d..2aad98c 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java
@@ -1,120 +1,124 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.extras.byteman;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import org.apache.activemq.artemis.core.server.ActivateCallback;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
-import org.apache.activemq.artemis.utils.Wait;
-import org.jboss.byteman.contrib.bmunit.BMRule;
-import org.jboss.byteman.contrib.bmunit.BMRules;
-import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-@RunWith(BMUnitRunner.class)
-public class FileLockMonitorTest {
-
-   private File sharedDir;
-   private volatile boolean lostLock = false;
-   private volatile FileLockNodeManager nodeManager;
-   private ScheduledThreadPoolExecutor executor;
-
-   @Before
-   public void handleLockFile() throws IOException {
-      sharedDir = File.createTempFile("shared-dir", "");
-      sharedDir.delete();
-      Assert.assertTrue(sharedDir.mkdir());
-      lostLock = false;
-   }
-
-   @Test
-   @BMRules(rules = {
-         @BMRule(name = "lock is invalid", targetClass = "sun.nio.ch.FileLockImpl", targetMethod = "isValid", action = "return false;") })
-   public void testLockMonitorInvalid() throws Exception {
-      lostLock = false;
-      startServer();
-      Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
-      nodeManager.isStarted();
-      nodeManager.crashLiveServer();
-      executor.shutdown();
-   }
-
-   @Test
-   @BMRules(rules = {
-         @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "throw new java.io.IOException(\"EFS is disconnected\");") })
-   public void testLockMonitorIOException() throws Exception {
-      lostLock = false;
-      startServer();
-      Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
-      nodeManager.crashLiveServer();
-      executor.shutdown();
-   }
-
-   @Test
-   public void testLockMonitorHasCorrectLockAndState() throws Exception {
-      lostLock = false;
-      startServer();
-      Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
-      nodeManager.crashLiveServer();
-      executor.shutdown();
-   }
-
-   @Test
-   @BMRules(rules = {
-         @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
-   public void testLockMonitorHasLockWrongState() throws Exception {
-      lostLock = false;
-      startServer();
-      Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
-      nodeManager.crashLiveServer();
-      executor.shutdown();
-   }
-
-   public LockListener startServer() throws Exception {
-      executor = new ScheduledThreadPoolExecutor(2);
-      nodeManager = new FileLockNodeManager(sharedDir, false, executor);
-      LockListener listener = nodeManager.new LockListener() {
-
-         @Override
-         protected void lostLock() throws Exception {
-            lostLock = true;
-            nodeManager.crashLiveServer();
-         }
-
-      };
-      nodeManager.registerLockListener(listener);
-
-      try {
-         nodeManager.start();
-         ActivateCallback startLiveNode = nodeManager.startLiveNode();
-         startLiveNode.activationComplete();
-
-      } catch (Exception exception) {
-         exception.printStackTrace();
-      }
-
-      return listener;
-   }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.io.File;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
+import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
+import org.apache.activemq.artemis.utils.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class FileLockMonitorTest {
+
+   private File sharedDir;
+   private volatile boolean lostLock = false;
+   private volatile FileLockNodeManager nodeManager;
+   private ScheduledThreadPoolExecutor executor;
+
+   @Before
+   public void handleLockFile() throws Exception {
+      sharedDir = File.createTempFile("shared-dir", "");
+      sharedDir.delete();
+      Assert.assertTrue(sharedDir.mkdir());
+   }
+
+   @Test
+   @BMRules(rules = {
+         @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "isLiveLockLost", action = "return true;") })
+   public void testLockMonitorInvalid() throws Exception {
+      lostLock = false;
+      startServer();
+      Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 20_000, 100);
+      nodeManager.isStarted();
+      nodeManager.crashLiveServer();
+      executor.shutdown();
+   }
+
+   public static void throwNodeManagerException(String msg) {
+      throw new NodeManager.NodeManagerException(msg);
+   }
+
+   @Test
+   @BMRules(rules = {
+         @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
+            targetMethod = "getState",
+            action = "org.apache.activemq.artemis.tests.extras.byteman.FileLockMonitorTest.throwNodeManagerException(\"EFS is disconnected\");") })
+   public void testLockMonitorIOException() throws Exception {
+      lostLock = false;
+      startServer();
+      Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
+      nodeManager.crashLiveServer();
+      executor.shutdown();
+   }
+
+   @Test
+   public void testLockMonitorHasCorrectLockAndState() throws Exception {
+      lostLock = false;
+      startServer();
+      Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
+      nodeManager.crashLiveServer();
+      executor.shutdown();
+   }
+
+   @Test
+   @BMRules(rules = {
+         @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
+   public void testLockMonitorHasLockWrongState() throws Exception {
+      lostLock = false;
+      startServer();
+      Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
+      nodeManager.crashLiveServer();
+      executor.shutdown();
+   }
+
+   public LockListener startServer() throws Exception {
+      executor = new ScheduledThreadPoolExecutor(2);
+      nodeManager = new FileLockNodeManager(sharedDir, false, executor);
+      LockListener listener = () -> {
+         lostLock = true;
+         try {
+            nodeManager.crashLiveServer();
+         } catch (Throwable t) {
+            t.printStackTrace();
+         }
+      };
+      nodeManager.registerLockListener(listener);
+
+      try {
+         nodeManager.start();
+         ActivateCallback startLiveNode = nodeManager.startLiveNode();
+         startLiveNode.activationComplete();
+
+      } catch (Exception exception) {
+         exception.printStackTrace();
+      }
+
+      return listener;
+   }
+}
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java
index 519cb63..148755f 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java
@@ -78,7 +78,7 @@ public class FileLockNodeManagerTest {
          manager.awaitLiveNode();
       } catch (Exception e) {
          long stop = System.currentTimeMillis();
-         if (!"timed out waiting for lock".equals(e.getMessage())) {
+         if (!"timed out waiting for lock".equals(e.getCause().getMessage())) {
             throw e;
          }
          return stop - start;
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java
deleted file mode 100644
index ae763c1..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.byteman;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
-import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
-import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
-import org.apache.activemq.artemis.core.server.NodeManager;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
-import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
-import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
-import org.jboss.byteman.contrib.bmunit.BMRule;
-import org.jboss.byteman.contrib.bmunit.BMRules;
-import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.jboss.logging.Logger;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-@RunWith(BMUnitRunner.class)
-public class SharedStoreBackupActivationTest extends FailoverTestBase {
-
-   private static Logger logger = Logger.getLogger(SharedStoreBackupActivationTest.class);
-
-   private static volatile boolean throwException = false;
-   private static CountDownLatch exceptionThrownLatch;
-
-   public static synchronized boolean isThrowException() {
-      logger.debugf("Throwing IOException during FileLockNodeManager.tryLock(): %s", throwException);
-      if (exceptionThrownLatch != null) {
-         exceptionThrownLatch.countDown();
-      }
-      return throwException;
-   }
-
-   /**
-    * Waits for the backup server to call FileLockNodeManager.tryLock().
-    */
-   public static void awaitTryLock(boolean throwException) throws InterruptedException {
-      synchronized (SharedStoreBackupActivationTest.class) {
-         SharedStoreBackupActivationTest.throwException = throwException;
-         exceptionThrownLatch = new CountDownLatch(1);
-      }
-      logger.debugf("Awaiting backup to perform FileLockNodeManager.tryLock()");
-      boolean ret = exceptionThrownLatch.await(10, TimeUnit.SECONDS);
-      SharedStoreBackupActivationTest.throwException = false;
-
-      Assert.assertTrue("FileLockNodeManager.tryLock() was not called during specified timeout", ret);
-      logger.debugf("Awaiting FileLockNodeManager.tryLock() done");
-   }
-
-   @Test
-   @BMRules(
-         rules = {@BMRule(
-               name = "throw IOException during activation",
-               targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
-               targetMethod = "tryLock",
-               targetLocation = "AT ENTRY",
-               condition = "org.apache.activemq.artemis.tests.extras.byteman.SharedStoreBackupActivationTest.isThrowException()",
-               action = "THROW new IOException(\"IO Error\");")
-         })
-   public void testFailOverAfterTryLockException() throws Exception {
-      Assert.assertTrue(liveServer.isActive());
-      Assert.assertFalse(backupServer.isActive());
-
-      // wait for backup to try to acquire lock, once without exception (acquiring will not succeed because live is
-      // still active)
-      awaitTryLock(false);
-
-      // wait for backup to try to acquire lock, this time throw an IOException
-      logger.debug("Causing exception");
-      awaitTryLock(true);
-
-      // stop live server
-      logger.debugf("Stopping live server");
-      liveServer.stop();
-      waitForServerToStop(liveServer.getServer());
-      logger.debugf("Live server stopped, waiting for backup activation");
-      backupServer.getServer().waitForActivation(10, TimeUnit.SECONDS);
-
-      // backup should be activated by now
-      Assert.assertFalse(liveServer.isActive());
-      Assert.assertTrue("Backup server didn't activate", backupServer.isActive());
-   }
-
-   @Override
-   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
-      return TransportConfigurationUtils.getInVMAcceptor(live);
-   }
-
-   @Override
-   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
-      return TransportConfigurationUtils.getInVMConnector(live);
-   }
-
-   @Override
-   protected void createConfigs() throws Exception {
-      File sharedDir = File.createTempFile("shared-dir", "");
-      sharedDir.delete();
-      Assert.assertTrue(sharedDir.mkdir());
-      logger.debugf("Created shared store directory %s", sharedDir.getCanonicalPath());
-
-      TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
-      TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
-
-      // nodes must use separate FileLockNodeManager instances!
-      NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
-      NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
-
-      backupConfig = super.createDefaultConfig(false)
-            .clearAcceptorConfigurations()
-            .addAcceptorConfiguration(getAcceptorTransportConfiguration(false))
-            .setHAPolicyConfiguration(
-                  new SharedStoreSlavePolicyConfiguration()
-                        .setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false))
-                        .setRestartBackup(false))
-            .addConnectorConfiguration(liveConnector.getName(), liveConnector)
-            .addConnectorConfiguration(backupConnector.getName(), backupConnector)
-            .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
-      backupServer = createTestableServer(backupConfig, backupNodeManager);
-
-      liveConfig = super.createDefaultConfig(false)
-            .clearAcceptorConfigurations()
-            .addAcceptorConfiguration(getAcceptorTransportConfiguration(true))
-            .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true))
-            .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName()))
-            .addConnectorConfiguration(liveConnector.getName(), liveConnector);
-      liveServer = createTestableServer(liveConfig, liveNodeManager);
-   }
-
-}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java
similarity index 66%
rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java
rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java
index 1dfe48b..be2dc7d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java
@@ -18,34 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.SpawnedVMSupport;
-import org.apache.activemq.artemis.utils.UUID;
 import org.junit.Assert;
-import org.junit.Test;
 
-public class RealNodeManagerTest extends NodeManagerTest {
-
-   @Test
-   public void testId() throws Exception {
-      NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false);
-      nodeManager.start();
-      UUID id1 = nodeManager.getUUID();
-      nodeManager.stop();
-      nodeManager.start();
-      ActiveMQTestBase.assertEqualsByteArrays(id1.asBytes(), nodeManager.getUUID().asBytes());
-      nodeManager.stop();
-   }
+public class FileLockNodeManagerTest extends NodeManagerTest {
 
    @Override
    public void performWork(NodeManagerAction... actions) throws Exception {
       List<Process> processes = new ArrayList<>();
       for (NodeManagerAction action : actions) {
-         Process p = SpawnedVMSupport.spawnVM(NodeManagerAction.class.getName(), "-Xms512m", "-Xmx512m", new String[0], true, true, true, action.getWork());
+         final String[] args = new String[action.works() + 1];
+         args[0] = getTemporaryDir();
+         action.getWork(args, 1);
+         Process p = SpawnedVMSupport.spawnVM(this.getClass().getName(), "-Xms50m", "-Xmx512m", new String[0], true, true, true, args);
          processes.add(p);
       }
       for (Process process : processes) {
@@ -58,4 +47,8 @@ public class RealNodeManagerTest extends NodeManagerTest {
       }
 
    }
+
+   public static void main(String[] args) throws Exception {
+      NodeManagerAction.execute(Arrays.copyOfRange(args, 1, args.length), new FileLockNodeManager(new File(args[0]), false));
+   }
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java
new file mode 100644
index 0000000..8c0ea8e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.Assert;
+
+public class JdbcNodeManagerTest extends NodeManagerTest {
+
+   @Override
+   public void performWork(NodeManagerAction... actions) throws Exception {
+      List<NodeRunner> nodeRunners = new ArrayList<>();
+      final ThreadFactory daemonThreadFactory = t -> {
+         final Thread th = new Thread(t);
+         th.setDaemon(true);
+         return th;
+      };
+      Thread[] threads = new Thread[actions.length];
+      List<ExecutorService> executors = new ArrayList<>(actions.length);
+      List<NodeManager> nodeManagers = new ArrayList<>(actions.length * 2);
+      AtomicBoolean failedRenew = new AtomicBoolean(false);
+      for (NodeManagerAction action : actions) {
+         final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
+         final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
+         final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
+         final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+         JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
+         nodeManager.start();
+         NodeRunner nodeRunner = new NodeRunner(nodeManager, action);
+         nodeRunners.add(nodeRunner);
+         nodeManagers.add(nodeManager);
+         executors.add(scheduledExecutorService);
+         executors.add(executor);
+      }
+      for (int i = 0, nodeRunnersSize = nodeRunners.size(); i < nodeRunnersSize; i++) {
+         NodeRunner nodeRunner = nodeRunners.get(i);
+         threads[i] = new Thread(nodeRunner);
+         threads[i].start();
+      }
+      boolean isDebug = isDebug();
+      for (Thread thread : threads) {
+         try {
+            if (isDebug) {
+               thread.join();
+            } else {
+               thread.join(60_000);
+            }
+         } catch (InterruptedException e) {
+            //
+         }
+         if (thread.isAlive()) {
+            thread.interrupt();
+            fail("thread still running");
+         }
+      }
+      // forcibly stop node managers
+      nodeManagers.forEach(nodeManager -> {
+         try {
+            nodeManager.stop();
+         } catch (Exception e) {
+            // won't prevent the test to complete
+            e.printStackTrace();
+         }
+      });
+
+      // stop executors
+      executors.forEach(ExecutorService::shutdownNow);
+
+      for (NodeRunner nodeRunner : nodeRunners) {
+         if (nodeRunner.e != null) {
+            nodeRunner.e.printStackTrace();
+            fail(nodeRunner.e.getMessage());
+         }
+      }
+      Assert.assertFalse("Some of the lease locks has failed to renew the locks", failedRenew.get());
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java
index 7d56321..6685aa5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster;
 
-import java.io.File;
+import java.util.Arrays;
 
 import org.apache.activemq.artemis.core.server.NodeManager;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
+import org.apache.activemq.artemis.utils.UUID;
 
 public class NodeManagerAction {
 
@@ -35,6 +35,7 @@ public class NodeManagerAction {
    public static final int HAS_BACKUP = 11;
    public static final int DOESNT_HAVE_LIVE = 12;
    public static final int DOESNT_HAVE_BACKUP = 13;
+   public static final int CHECK_ID = 14;
 
    private final int[] work;
 
@@ -82,7 +83,6 @@ public class NodeManagerAction {
                }
                break;
             case HAS_BACKUP:
-
                if (!hasBackupLock) {
                   throw new IllegalStateException("backup lock not held");
                }
@@ -93,37 +93,50 @@ public class NodeManagerAction {
                }
                break;
             case DOESNT_HAVE_BACKUP:
-
                if (hasBackupLock) {
                   throw new IllegalStateException("backup lock held");
                }
                break;
+            case CHECK_ID:
+               nodeManager.start();
+               UUID id1 = nodeManager.getUUID();
+               nodeManager.stop();
+               nodeManager.start();
+               if (!Arrays.equals(id1.asBytes(), nodeManager.getUUID().asBytes())) {
+                  throw new IllegalStateException("getUUID should be the same on restart");
+               }
+               break;
          }
       }
    }
 
-   public String[] getWork() {
-      String[] strings = new String[work.length];
-      for (int i = 0, stringsLength = strings.length; i < stringsLength; i++) {
-         strings[i] = "" + work[i];
+   public int works() {
+      return work.length;
+   }
+
+   public int getWork(String[] works, int start) {
+      final int workLength = work.length;
+      for (int i = 0; i < workLength; i++) {
+         works[i + start] = Integer.toString(work[i]);
       }
-      return strings;
+      return workLength;
    }
 
-   public static void main(String[] args) throws Exception {
+   public static void execute(String[] args, NodeManager nodeManager) throws Exception {
       int[] work1 = new int[args.length];
       for (int i = 0; i < args.length; i++) {
          work1[i] = Integer.parseInt(args[i]);
 
       }
       NodeManagerAction nodeManagerAction = new NodeManagerAction(work1);
-      FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false);
       nodeManager.start();
       try {
          nodeManagerAction.performWork(nodeManager);
       } catch (Exception e) {
          e.printStackTrace();
          System.exit(9);
+      } finally {
+         nodeManager.stop();
       }
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java
index 4c6f6ca..c484bf1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java
@@ -24,7 +24,9 @@ import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
 import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
 import org.junit.Test;
 
+import static java.lang.management.ManagementFactory.getRuntimeMXBean;
 import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.AWAIT_LIVE;
+import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CHECK_ID;
 import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CRASH_LIVE;
 import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_BACKUP;
 import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_LIVE;
@@ -39,6 +41,12 @@ import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerA
 public class NodeManagerTest extends SpawnedTestBase {
 
    @Test
+   public void testID() throws Exception {
+      NodeManagerAction live1 = new NodeManagerAction(CHECK_ID);
+      performWork(live1);
+   }
+
+   @Test
    public void testLive() throws Exception {
       NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
       performWork(live1);
@@ -155,6 +163,10 @@ public class NodeManagerTest extends SpawnedTestBase {
       }
    }
 
+   protected static boolean isDebug() {
+      return getRuntimeMXBean().getInputArguments().toString().contains("jdwp");
+   }
+
    static class NodeRunner implements Runnable {
 
       private NodeManagerAction action;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java
index 3019381..fc1b3b9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java
@@ -147,10 +147,7 @@ public class FileLockNodeManagerTest extends FailoverTestBase {
             executors.add(executor);
             final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
             final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
-            return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
-               code.printStackTrace();
-               Assert.fail(message);
-            });
+            return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
          case File:
             final Configuration config = createDefaultInVMConfig();
             if (useSeparateLockFolder) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
index d32e46a..1438d46 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
@@ -116,10 +116,7 @@ public class NettyFailoverTest extends FailoverTest {
             executors.add(executor);
             final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
             final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
-            return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
-               code.printStackTrace();
-               Assert.fail(message);
-            });
+            return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
          default:
             throw new AssertionError("enum type not supported!");
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
index a76922e..979954c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
@@ -100,7 +100,7 @@ public class CriticalCrashTest extends SpawnedTestBase {
          @Override
          protected StorageManager createStorageManager() {
 
-            JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
+            JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
                @Override
                public void readLock() {
                   super.readLock();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java
index 28d8db6..2c0a517 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java
@@ -91,7 +91,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
          @Override
          protected StorageManager createStorageManager() {
 
-            JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
+            JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
 
                @Override
                protected Journal createMessageJournal(Configuration config,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
index 3182bdb..8504309 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
@@ -175,38 +175,38 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
    protected final class FakeNodeManager extends NodeManager {
 
       public FakeNodeManager(String nodeID) {
-         super(false, null);
+         super(false);
          this.setNodeID(nodeID);
       }
 
       @Override
-      public void awaitLiveNode() throws Exception {
+      public void awaitLiveNode() {
       }
 
       @Override
-      public void awaitLiveStatus() throws Exception {
+      public void awaitLiveStatus() {
       }
 
       @Override
-      public void startBackup() throws Exception {
+      public void startBackup() {
       }
 
       @Override
-      public ActivateCallback startLiveNode() throws Exception {
+      public ActivateCallback startLiveNode() {
          return new CleaningActivateCallback() {
          };
       }
 
       @Override
-      public void pauseLiveServer() throws Exception {
+      public void pauseLiveServer() {
       }
 
       @Override
-      public void crashLiveServer() throws Exception {
+      public void crashLiveServer() {
       }
 
       @Override
-      public void releaseBackup() throws Exception {
+      public void releaseBackup() {
       }
 
       @Override
@@ -215,12 +215,12 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean isAwaitingFailback() throws Exception {
+      public boolean isAwaitingFailback() {
          return false;
       }
 
       @Override
-      public boolean isBackupLive() throws Exception {
+      public boolean isBackupLive() {
          return false;
       }