You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/20 21:24:00 UTC
[activemq-artemis] branch master updated: ARTEMIS-2941 Improve JDBC
HA connection resiliency
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 4545749 ARTEMIS-2941 Improve JDBC HA connection resiliency
new 9a95418 This closes #3301
4545749 is described below
commit 4545749969a15052ba52edd4700f29421a8e746d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Mon Sep 28 13:49:20 2020 +0200
ARTEMIS-2941 Improve JDBC HA connection resiliency
---
.../activemq/artemis/core/server/NodeManager.java | 176 ++++-------
.../core/server/cluster/ha/ColocatedPolicy.java | 5 +-
.../artemis/core/server/cluster/ha/HAPolicy.java | 3 +-
.../core/server/cluster/ha/LiveOnlyPolicy.java | 3 +-
.../core/server/cluster/ha/ReplicaPolicy.java | 5 +-
.../core/server/cluster/ha/ReplicatedPolicy.java | 3 +-
.../server/cluster/ha/SharedStoreMasterPolicy.java | 5 +-
.../server/cluster/ha/SharedStoreSlavePolicy.java | 5 +-
.../core/server/impl/ActiveMQServerImpl.java | 57 ++--
.../FileBasedNodeManager.java} | 129 ++------
.../core/server/impl/FileLockNodeManager.java | 351 +++++++++++----------
.../artemis/core/server/impl/InVMNodeManager.java | 30 +-
.../server/impl/SharedNothingBackupActivation.java | 9 +-
.../server/impl/SharedNothingLiveActivation.java | 2 +-
.../server/impl/SharedStoreBackupActivation.java | 136 ++++++--
.../server/impl/SharedStoreLiveActivation.java | 110 ++++---
.../impl/jdbc/ActiveMQScheduledLeaseLock.java | 55 +++-
.../core/server/impl/jdbc/JdbcLeaseLock.java | 54 +++-
.../core/server/impl/jdbc/JdbcNodeManager.java | 328 ++++++++++++-------
.../server/impl/jdbc/JdbcSharedStateManager.java | 46 ++-
.../artemis/core/server/impl/jdbc/LeaseLock.java | 2 +
.../core/server/impl/jdbc/ScheduledLeaseLock.java | 14 +-
.../core/server/impl/jdbc/JdbcLeaseLockTest.java | 113 ++++++-
.../core/server/impl/jdbc/JdbcNodeManagerTest.java | 15 +-
.../artemis/tests/util/ActiveMQTestBase.java | 5 +-
.../tests/extras/byteman/FileLockMonitorTest.java | 244 +++++++-------
.../extras/byteman/FileLockNodeManagerTest.java | 2 +-
.../byteman/SharedStoreBackupActivationTest.java | 151 ---------
...nagerTest.java => FileLockNodeManagerTest.java} | 27 +-
.../integration/cluster/JdbcNodeManagerTest.java | 104 ++++++
.../integration/cluster/NodeManagerAction.java | 35 +-
.../tests/integration/cluster/NodeManagerTest.java | 12 +
.../cluster/failover/FileLockNodeManagerTest.java | 5 +-
.../cluster/failover/NettyFailoverTest.java | 5 +-
.../integration/critical/CriticalCrashTest.java | 2 +-
.../ShutdownOnCriticalIOErrorMoveNextTest.java | 2 +-
.../integration/discovery/DiscoveryBaseTest.java | 20 +-
37 files changed, 1249 insertions(+), 1021 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
index 4b5c3cb..a6b05e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
@@ -16,52 +16,50 @@
*/
package org.apache.activemq.artemis.core.server;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.utils.UUID;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
public abstract class NodeManager implements ActiveMQComponent {
- protected static final byte FIRST_TIME_START = '0';
- public static final String SERVER_LOCK_NAME = "server.lock";
- private static final String ACCESS_MODE = "rw";
+ @FunctionalInterface
+ public interface LockListener {
+ void lostLock();
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(NodeManager.class);
protected final boolean replicatedBackup;
- private final File directory;
- private final Object nodeIDGuard = new Object();
+ protected final Object nodeIDGuard = new Object();
private SimpleString nodeID;
private UUID uuid;
private boolean isStarted = false;
+ private final Set<FileLockNodeManager.LockListener> lockListeners;
- protected FileChannel channel;
-
- public NodeManager(final boolean replicatedBackup, final File directory) {
- this.directory = directory;
+ public NodeManager(final boolean replicatedBackup) {
this.replicatedBackup = replicatedBackup;
+ this.lockListeners = new HashSet<>();
}
// --------------------------------------------------------------------
- public abstract void awaitLiveNode() throws Exception;
+ public abstract void awaitLiveNode() throws NodeManagerException, InterruptedException;
- public abstract void awaitLiveStatus() throws Exception;
+ public abstract void awaitLiveStatus() throws NodeManagerException, InterruptedException;
- public abstract void startBackup() throws Exception;
+ public abstract void startBackup() throws NodeManagerException, InterruptedException;
- public abstract ActivateCallback startLiveNode() throws Exception;
+ public abstract ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException;
- public abstract void pauseLiveServer() throws Exception;
+ public abstract void pauseLiveServer() throws NodeManagerException;
- public abstract void crashLiveServer() throws Exception;
+ public abstract void crashLiveServer() throws NodeManagerException;
- public abstract void releaseBackup() throws Exception;
+ public abstract void releaseBackup() throws NodeManagerException;
// --------------------------------------------------------------------
@@ -81,7 +79,7 @@ public abstract class NodeManager implements ActiveMQComponent {
}
}
- public abstract SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException;
+ public abstract SimpleString readNodeId() throws NodeManagerException;
public UUID getUUID() {
synchronized (nodeIDGuard) {
@@ -113,119 +111,63 @@ public abstract class NodeManager implements ActiveMQComponent {
}
}
- public abstract boolean isAwaitingFailback() throws Exception;
+ public abstract boolean isAwaitingFailback() throws NodeManagerException;
- public abstract boolean isBackupLive() throws Exception;
+ public abstract boolean isBackupLive() throws NodeManagerException;
public abstract void interrupt();
@Override
public synchronized void stop() throws Exception {
- FileChannel channelCopy = channel;
- if (channelCopy != null)
- channelCopy.close();
+ // force any running threads on node manager to stop
isStarted = false;
+ lockListeners.clear();
}
- public void stopBackup() throws Exception {
- if (replicatedBackup && getNodeId() != null) {
- setUpServerLockFile();
- }
+ public void stopBackup() throws NodeManagerException {
releaseBackup();
}
- /**
- * Ensures existence of persistent information about the server's nodeID.
- * <p>
- * Roughly the different use cases are:
- * <ol>
- * <li>old live server restarts: a server.lock file already exists and contains a nodeID.
- * <li>new live server starting for the first time: no file exists, and we just *create* a new
- * UUID to use as nodeID
- * <li>replicated backup received its nodeID from its live: no file exists, we need to persist
- * the *current* nodeID
- * </ol>
- */
- protected synchronized void setUpServerLockFile() throws IOException {
- File serverLockFile = newFile(SERVER_LOCK_NAME);
-
- boolean fileCreated = false;
-
- int count = 0;
- while (!serverLockFile.exists()) {
- try {
- fileCreated = serverLockFile.createNewFile();
- } catch (RuntimeException e) {
- ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
- throw e;
- } catch (IOException e) {
- /*
- * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
- * */
- if (count < 5) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e1) {
- }
- count++;
- continue;
- }
- ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
- throw e;
- }
+ protected synchronized void checkStarted() {
+ if (!isStarted) {
+ throw new IllegalStateException("the node manager is supposed to be started");
}
+ }
- @SuppressWarnings("resource")
- RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE);
-
- channel = raFile.getChannel();
-
- if (fileCreated) {
- ByteBuffer id = ByteBuffer.allocateDirect(3);
- byte[] bytes = new byte[3];
- bytes[0] = FIRST_TIME_START;
- bytes[1] = FIRST_TIME_START;
- bytes[2] = FIRST_TIME_START;
- id.put(bytes, 0, 3);
- id.position(0);
- channel.write(id, 0);
- channel.force(true);
+ protected synchronized void notifyLostLock() {
+ if (!isStarted) {
+ return;
}
+ lockListeners.forEach(lockListener -> {
+ try {
+ lockListener.lostLock();
+ } catch (Exception e) {
+ LOGGER.warn("On notify lost lock", e);
+ // Need to notify everyone so ignore any exception
+ }
+ });
+ }
- createNodeId();
+ public synchronized void registerLockListener(FileLockNodeManager.LockListener lockListener) {
+ lockListeners.add(lockListener);
}
- /**
- * @return
- */
- protected final File newFile(final String fileName) {
- File file = new File(directory, fileName);
- return file;
+ public synchronized void unregisterLockListener(FileLockNodeManager.LockListener lockListener) {
+ lockListeners.remove(lockListener);
}
- protected final synchronized void createNodeId() throws IOException {
- synchronized (nodeIDGuard) {
- ByteBuffer id = ByteBuffer.allocateDirect(16);
- int read = channel.read(id, 3);
- if (replicatedBackup) {
- id.position(0);
- id.put(getUUID().asBytes(), 0, 16);
- id.position(0);
- channel.write(id, 3);
- channel.force(true);
- } else if (read != 16) {
- setUUID(UUIDGenerator.getInstance().generateUUID());
- id.put(getUUID().asBytes(), 0, 16);
- id.position(0);
- channel.write(id, 3);
- channel.force(true);
- } else {
- byte[] bytes = new byte[16];
- id.position(0);
- id.get(bytes);
- setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
- }
+ public static final class NodeManagerException extends RuntimeException {
+
+ public NodeManagerException(String message) {
+ super(message);
+ }
+
+ public NodeManagerException(Throwable cause) {
+ super(cause);
}
- }
+ public NodeManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
index b72c755..572f54c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ColocatedActivation;
import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@@ -91,8 +92,8 @@ public class ColocatedPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception {
- return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, shutdownOnCriticalIO));
+ IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
+ return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, ioCriticalErrorListener));
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
index bb93014..c5d62ac 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -34,7 +35,7 @@ public interface HAPolicy<T extends Activation> {
T createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception;
+ IOCriticalErrorListener shutdownOnCriticalIO) throws Exception;
boolean isSharedStore();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java
index 442c22d..9aa6a90 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
@@ -37,7 +38,7 @@ public class LiveOnlyPolicy implements HAPolicy<Activation> {
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
+ IOCriticalErrorListener ioCriticalErrorListener) {
return new LiveOnlyActivation(server, this);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
index 36e65f0..2fbdfaa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -211,8 +212,8 @@ public class ReplicaPolicy extends BackupPolicy {
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception {
- SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this, networkHealthCheck);
+ IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
+ SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, ioCriticalErrorListener, this, networkHealthCheck);
backupActivation.init();
return backupActivation;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
index 99b98fe..d4851ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@@ -231,7 +232,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
+ IOCriticalErrorListener ioCriticalErrorListener) {
return new SharedNothingLiveActivation(server, this);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java
index 82bbaf7..3e3913e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveActivation;
import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation;
@@ -91,8 +92,8 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
- return new SharedStoreLiveActivation(server, this);
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ return new SharedStoreLiveActivation(server, this, ioCriticalErrorListener);
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
index a4a0ed1..4a00ccf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedStoreBackupActivation;
@@ -101,8 +102,8 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
- return new SharedStoreBackupActivation(server, this);
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ return new SharedStoreBackupActivation(server, this, ioCriticalErrorListener);
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c538569..0616362 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -329,7 +329,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final Map<String, Object> activationParams = new HashMap<>();
- protected final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
+ protected final IOCriticalErrorListener ioCriticalErrorListener = new DefaultCriticalErrorListener();
private final ActiveMQServer parentServer;
@@ -522,7 +522,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
}
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
- manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO);
+ manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory);
} else if (haType == null || haType == HAPolicyConfiguration.TYPE.LIVE_ONLY) {
if (logger.isDebugEnabled()) {
logger.debug("Detected no Shared Store HA options on JDBC store");
@@ -610,7 +610,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean wasLive = !haPolicy.isBackup();
if (!haPolicy.isBackup()) {
- activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
+ activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
if (afterActivationCreated != null) {
try {
@@ -636,9 +636,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// checking again here
if (haPolicy.isBackup()) {
if (haPolicy.isSharedStore()) {
- activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
+ activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
} else {
- activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
+ activation = haPolicy.createActivation(this, wasLive, activationParams, ioCriticalErrorListener);
}
if (afterActivationCreated != null) {
@@ -1117,12 +1117,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.stop(failoverOnServerShutdown, criticalIOError, restarting, false);
}
+ private void stop(boolean failoverOnServerShutdown,
+ final boolean criticalIOError,
+ boolean restarting,
+ boolean isShutdown) {
+ stop(failoverOnServerShutdown, criticalIOError, isShutdown || criticalIOError, restarting, isShutdown);
+ }
+
/**
* Stops the server
*
* @param criticalIOError whether we have encountered an IO error with the journal etc
*/
- void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) {
+ private void stop(boolean failoverOnServerShutdown,
+ final boolean criticalIOError,
+ boolean shutdownExternalComponents,
+ boolean restarting,
+ boolean isShutdown) {
if (logger.isDebugEnabled()) {
logger.debug("Stopping server " + this);
@@ -1344,7 +1355,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
connectedClientIds.clear();
- stopExternalComponents(isShutdown || criticalIOError);
+ stopExternalComponents(shutdownExternalComponents);
try {
this.analyzer.clear();
@@ -2794,9 +2805,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
- return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
+ return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, ioCriticalErrorListener, configuration.isReadWholePage());
}
- return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
+ return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener, configuration.isReadWholePage());
}
/**
@@ -2805,12 +2816,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
- JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
+ JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, ioCriticalErrorListener);
this.getCriticalAnalyzer().add(journal);
return journal;
} else {
// Default to File Based Storage Manager, (Legacy default configuration).
- JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);
+ JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
this.getCriticalAnalyzer().add(journal);
return journal;
}
@@ -3136,7 +3147,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (configuration.getMaxDiskUsage() != -1) {
try {
- injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO));
+ injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, ioCriticalErrorListener));
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e);
}
@@ -3929,23 +3940,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Inner classes
// --------------------------------------------------------------------------------
- public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener {
+ public final class DefaultCriticalErrorListener implements IOCriticalErrorListener {
- boolean failedAlready = false;
+ private final AtomicBoolean failedAlready = new AtomicBoolean();
@Override
public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
- if (!failedAlready) {
- failedAlready = true;
-
- if (file == null) {
- ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
- } else {
- ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
- }
+ if (!failedAlready.compareAndSet(false, true)) {
+ return;
+ }
- stopTheServer(true);
+ if (file == null) {
+ ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
+ } else {
+ ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
}
+
+ stopTheServer(true);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
similarity index 63%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
index 4b5c3cb..cfbcb47 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.server;
+package org.apache.activemq.artemis.core.server.impl;
import java.io.File;
import java.io.IOException;
@@ -22,116 +22,22 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
-import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
-public abstract class NodeManager implements ActiveMQComponent {
+public abstract class FileBasedNodeManager extends NodeManager {
protected static final byte FIRST_TIME_START = '0';
public static final String SERVER_LOCK_NAME = "server.lock";
private static final String ACCESS_MODE = "rw";
-
- protected final boolean replicatedBackup;
private final File directory;
- private final Object nodeIDGuard = new Object();
- private SimpleString nodeID;
- private UUID uuid;
- private boolean isStarted = false;
-
protected FileChannel channel;
- public NodeManager(final boolean replicatedBackup, final File directory) {
+ public FileBasedNodeManager(boolean replicatedBackup, File directory) {
+ super(replicatedBackup);
this.directory = directory;
- this.replicatedBackup = replicatedBackup;
- }
-
- // --------------------------------------------------------------------
-
- public abstract void awaitLiveNode() throws Exception;
-
- public abstract void awaitLiveStatus() throws Exception;
-
- public abstract void startBackup() throws Exception;
-
- public abstract ActivateCallback startLiveNode() throws Exception;
-
- public abstract void pauseLiveServer() throws Exception;
-
- public abstract void crashLiveServer() throws Exception;
-
- public abstract void releaseBackup() throws Exception;
-
- // --------------------------------------------------------------------
-
- @Override
- public synchronized void start() throws Exception {
- isStarted = true;
- }
-
- @Override
- public boolean isStarted() {
- return isStarted;
- }
-
- public SimpleString getNodeId() {
- synchronized (nodeIDGuard) {
- return nodeID;
- }
- }
-
- public abstract SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException;
-
- public UUID getUUID() {
- synchronized (nodeIDGuard) {
- return uuid;
- }
- }
-
- /**
- * Sets the nodeID.
- * <p>
- * Only used by replicating backups.
- *
- * @param nodeID
- */
- public void setNodeID(String nodeID) {
- synchronized (nodeIDGuard) {
- this.nodeID = new SimpleString(nodeID);
- this.uuid = new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeID));
- }
- }
-
- /**
- * @param generateUUID
- */
- protected void setUUID(UUID generateUUID) {
- synchronized (nodeIDGuard) {
- uuid = generateUUID;
- nodeID = new SimpleString(uuid.toString());
- }
- }
-
- public abstract boolean isAwaitingFailback() throws Exception;
-
- public abstract boolean isBackupLive() throws Exception;
-
- public abstract void interrupt();
-
- @Override
- public synchronized void stop() throws Exception {
- FileChannel channelCopy = channel;
- if (channelCopy != null)
- channelCopy.close();
- isStarted = false;
- }
-
- public void stopBackup() throws Exception {
- if (replicatedBackup && getNodeId() != null) {
- setUpServerLockFile();
- }
- releaseBackup();
}
/**
@@ -160,8 +66,8 @@ public abstract class NodeManager implements ActiveMQComponent {
throw e;
} catch (IOException e) {
/*
- * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
- * */
+ * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
+ * */
if (count < 5) {
try {
Thread.sleep(100);
@@ -228,4 +134,23 @@ public abstract class NodeManager implements ActiveMQComponent {
}
}
+ @Override
+ public synchronized void stop() throws Exception {
+ FileChannel channelCopy = channel;
+ if (channelCopy != null)
+ channelCopy.close();
+ super.stop();
+ }
+
+ @Override
+ public void stopBackup() throws NodeManagerException {
+ if (replicatedBackup && getNodeId() != null) {
+ try {
+ setUpServerLockFile();
+ } catch (IOException e) {
+ throw new NodeManagerException(e);
+ }
+ }
+ super.stopBackup();
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
index ddd59ff..3b3da1c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java
@@ -22,23 +22,18 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger;
-public class FileLockNodeManager extends NodeManager {
+public class FileLockNodeManager extends FileBasedNodeManager {
private static final Logger logger = Logger.getLogger(FileLockNodeManager.class);
@@ -58,9 +53,9 @@ public class FileLockNodeManager extends NodeManager {
private static final byte NOT_STARTED = 'N';
- private static final long LOCK_ACCESS_FAILURE_WAIT_TIME = 2000;
+ private static final long LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
- private static final int LOCK_MONITOR_TIMEOUT_MILLIES = 2000;
+ private static final long LOCK_MONITOR_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(2);
private volatile FileLock liveLock;
@@ -68,28 +63,31 @@ public class FileLockNodeManager extends NodeManager {
private final FileChannel[] lockChannels = new FileChannel[3];
- protected long lockAcquisitionTimeout = -1;
+ private final long lockAcquisitionTimeoutNanos;
protected boolean interrupted = false;
- private ScheduledExecutorService scheduledPool;
+ private final ScheduledExecutorService scheduledPool;
public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
+ this.lockAcquisitionTimeoutNanos = -1;
}
public FileLockNodeManager(final File directory, boolean replicatedBackup) {
super(replicatedBackup, directory);
this.scheduledPool = null;
+ this.lockAcquisitionTimeoutNanos = -1;
}
- public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout,
- ScheduledExecutorService scheduledPool) {
+ public FileLockNodeManager(final File directory,
+ boolean replicatedBackup,
+ long lockAcquisitionTimeout,
+ ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
-
this.scheduledPool = scheduledPool;
- this.lockAcquisitionTimeout = lockAcquisitionTimeout;
+ this.lockAcquisitionTimeoutNanos = lockAcquisitionTimeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeout);
}
@Override
@@ -141,19 +139,23 @@ public class FileLockNodeManager extends NodeManager {
}
@Override
- public boolean isAwaitingFailback() throws Exception {
+ public boolean isAwaitingFailback() throws NodeManagerException {
return getState() == FileLockNodeManager.FAILINGBACK;
}
@Override
- public boolean isBackupLive() throws Exception {
- FileLock liveAttemptLock;
- liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS);
- if (liveAttemptLock == null) {
- return true;
- } else {
- liveAttemptLock.release();
- return false;
+ public boolean isBackupLive() throws NodeManagerException {
+ try {
+ FileLock liveAttemptLock;
+ liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS);
+ if (liveAttemptLock == null) {
+ return true;
+ } else {
+ liveAttemptLock.release();
+ return false;
+ }
+ } catch (IOException e) {
+ throw new NodeManagerException(e);
}
}
@@ -167,182 +169,218 @@ public class FileLockNodeManager extends NodeManager {
}
@Override
- public final void releaseBackup() throws Exception {
- if (backupLock != null) {
- backupLock.release();
- backupLock = null;
+ public final void releaseBackup() throws NodeManagerException {
+ try {
+ if (backupLock != null) {
+ backupLock.release();
+ backupLock = null;
+ }
+ } catch (IOException e) {
+ throw new NodeManagerException(e);
}
}
@Override
- public void awaitLiveNode() throws Exception {
- logger.debug("awaiting live node...");
- do {
- byte state = getState();
- while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
- logger.debug("awaiting live node startup state='" + state + "'");
- Thread.sleep(2000);
- state = getState();
- }
+ public void awaitLiveNode() throws NodeManagerException, InterruptedException {
+ try {
+ logger.debug("awaiting live node...");
+ do {
+ byte state = getState();
+ while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
+ logger.debug("awaiting live node startup state='" + state + "'");
+ Thread.sleep(2000);
+ state = getState();
+ }
- liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
- if (interrupted) {
- interrupted = false;
- throw new InterruptedException("Lock was interrupted");
- }
- state = getState();
- if (state == FileLockNodeManager.PAUSED) {
- liveLock.release();
- logger.debug("awaiting live node restarting");
- Thread.sleep(2000);
- } else if (state == FileLockNodeManager.FAILINGBACK) {
- liveLock.release();
- logger.debug("awaiting live node failing back");
- Thread.sleep(2000);
- } else if (state == FileLockNodeManager.LIVE) {
- logger.debug("acquired live node lock state = " + (char) state);
- break;
+ liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
+ if (interrupted) {
+ interrupted = false;
+ throw new InterruptedException("Lock was interrupted");
+ }
+ state = getState();
+ if (state == FileLockNodeManager.PAUSED) {
+ liveLock.release();
+ logger.debug("awaiting live node restarting");
+ Thread.sleep(2000);
+ } else if (state == FileLockNodeManager.FAILINGBACK) {
+ liveLock.release();
+ logger.debug("awaiting live node failing back");
+ Thread.sleep(2000);
+ } else if (state == FileLockNodeManager.LIVE) {
+ logger.debug("acquired live node lock state = " + (char) state);
+ break;
+ }
}
+ while (true);
+ } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
+ throw new NodeManagerException(e);
}
- while (true);
}
@Override
- public void startBackup() throws Exception {
+ public void startBackup() throws NodeManagerException {
assert !replicatedBackup; // should not be called if this is a replicating backup
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
-
- backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS);
+ try {
+ backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS);
+ } catch (ActiveMQLockAcquisitionTimeoutException e) {
+ throw new NodeManagerException(e);
+ }
ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null)
readNodeId();
}
@Override
- public ActivateCallback startLiveNode() throws Exception {
- setFailingBack();
+ public ActivateCallback startLiveNode() throws NodeManagerException {
+ try {
+ setFailingBack();
- String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds";
+ String timeoutMessage = lockAcquisitionTimeoutNanos == -1 ? "indefinitely" : TimeUnit.NANOSECONDS.toMillis(lockAcquisitionTimeoutNanos) + " milliseconds";
- ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
+ ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
- liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
+ liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
- ActiveMQServerLogger.LOGGER.obtainedLiveLock();
+ ActiveMQServerLogger.LOGGER.obtainedLiveLock();
- return new CleaningActivateCallback() {
- @Override
- public void activationComplete() {
- try {
- setLive();
- startLockMonitoring();
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ return new CleaningActivateCallback() {
+ @Override
+ public void activationComplete() {
+ try {
+ setLive();
+ startLockMonitoring();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ // that allows to restart/stop the broker if needed
+ throw e;
+ }
}
- }
- };
+ };
+ } catch (ActiveMQLockAcquisitionTimeoutException e) {
+ throw new NodeManagerException(e);
+ }
}
@Override
- public void pauseLiveServer() throws Exception {
+ public void pauseLiveServer() throws NodeManagerException {
stopLockMonitoring();
setPaused();
- if (liveLock != null) {
- liveLock.release();
+ try {
+ if (liveLock != null) {
+ liveLock.release();
+ }
+ } catch (IOException e) {
+ throw new NodeManagerException(e);
}
}
@Override
- public void crashLiveServer() throws Exception {
+ public void crashLiveServer() throws NodeManagerException {
stopLockMonitoring();
if (liveLock != null) {
- liveLock.release();
- liveLock = null;
+ try {
+ liveLock.release();
+ } catch (IOException e) {
+ throw new NodeManagerException(e);
+ } finally {
+ liveLock = null;
+ }
}
}
@Override
- public void awaitLiveStatus() throws Exception {
+ public void awaitLiveStatus() throws NodeManagerException, InterruptedException {
while (getState() != LIVE) {
Thread.sleep(2000);
}
}
- private void setLive() throws Exception {
+ private void setLive() throws NodeManagerException {
writeFileLockStatus(FileLockNodeManager.LIVE);
}
- private void setFailingBack() throws Exception {
+ private void setFailingBack() throws NodeManagerException {
writeFileLockStatus(FAILINGBACK);
}
- private void setPaused() throws Exception {
+ private void setPaused() throws NodeManagerException {
writeFileLockStatus(PAUSED);
}
/**
* @param status
- * @throws IOException
+ * @throws ActiveMQLockAcquisitionTimeoutException,IOException
*/
- private void writeFileLockStatus(byte status) throws Exception {
+ private void writeFileLockStatus(byte status) throws NodeManagerException {
if (replicatedBackup && channel == null)
return;
logger.debug("writing status: " + status);
ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status);
bb.position(0);
- if (!channel.isOpen()) {
- setUpServerLockFile();
- }
- FileLock lock = null;
try {
- lock = lock(STATE_LOCK_POS);
- channel.write(bb, 0);
- channel.force(true);
- } finally {
- if (lock != null) {
- lock.release();
+ if (!channel.isOpen()) {
+ setUpServerLockFile();
}
+ FileLock lock = null;
+ try {
+ lock = lock(STATE_LOCK_POS);
+ channel.write(bb, 0);
+ channel.force(true);
+ } finally {
+ if (lock != null) {
+ lock.release();
+ }
+ }
+ } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
+ throw new NodeManagerException(e);
}
}
- private byte getState() throws Exception {
- byte result;
- logger.debug("getting state...");
- ByteBuffer bb = ByteBuffer.allocateDirect(1);
- int read;
- FileLock lock = null;
+ private byte getState() throws NodeManagerException {
try {
- lock = lock(STATE_LOCK_POS);
- read = channel.read(bb, 0);
- if (read <= 0) {
- result = FileLockNodeManager.NOT_STARTED;
- } else {
- result = bb.get(0);
- }
- } finally {
- if (lock != null) {
- lock.release();
+ byte result;
+ logger.debug("getting state...");
+ ByteBuffer bb = ByteBuffer.allocateDirect(1);
+ int read;
+ FileLock lock = null;
+ try {
+ lock = lock(STATE_LOCK_POS);
+ read = channel.read(bb, 0);
+ if (read <= 0) {
+ result = FileLockNodeManager.NOT_STARTED;
+ } else {
+ result = bb.get(0);
+ }
+ } finally {
+ if (lock != null) {
+ lock.release();
+ }
}
+ logger.debug("state: " + result);
+ return result;
+ } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
+ throw new NodeManagerException(e);
}
-
- logger.debug("state: " + result);
-
- return result;
}
@Override
- public final SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException {
- ByteBuffer id = ByteBuffer.allocateDirect(16);
- int read = channel.read(id, 3);
- if (read != 16) {
- throw new ActiveMQIllegalStateException("live server did not write id to file");
+ public final SimpleString readNodeId() throws NodeManagerException {
+ try {
+ ByteBuffer id = ByteBuffer.allocateDirect(16);
+ int read = channel.read(id, 3);
+ if (read != 16) {
+ throw new IOException("live server did not write id to file");
+ }
+ byte[] bytes = new byte[16];
+ id.position(0);
+ id.get(bytes);
+ setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
+ return getNodeId();
+ } catch (IOException e) {
+ throw new NodeManagerException(e);
}
- byte[] bytes = new byte[16];
- id.position(0);
- id.get(bytes);
- setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
- return getNodeId();
}
protected FileLock tryLock(final int lockPos) throws IOException {
@@ -361,8 +399,8 @@ public class FileLockNodeManager extends NodeManager {
}
}
- protected FileLock lock(final int lockPosition) throws Exception {
- long start = System.currentTimeMillis();
+ protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTimeoutException {
+ long start = System.nanoTime();
boolean isRecurringFailure = false;
while (!interrupted) {
@@ -377,7 +415,7 @@ public class FileLockNodeManager extends NodeManager {
return null;
}
- if (lockAcquisitionTimeout != -1 && (System.currentTimeMillis() - start) > lockAcquisitionTimeout) {
+ if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
}
} else {
@@ -390,9 +428,9 @@ public class FileLockNodeManager extends NodeManager {
"Failure when accessing a lock file", e);
isRecurringFailure = true;
- long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME;
- if (lockAcquisitionTimeout != -1) {
- final long remainingTime = lockAcquisitionTimeout - (System.currentTimeMillis() - start);
+ long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS;
+ if (this.lockAcquisitionTimeoutNanos != -1) {
+ final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start);
if (remainingTime <= 0) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
}
@@ -400,7 +438,7 @@ public class FileLockNodeManager extends NodeManager {
}
try {
- Thread.sleep(waitTime);
+ TimeUnit.NANOSECONDS.sleep(waitTime);
} catch (InterruptedException interrupt) {
return null;
}
@@ -414,7 +452,7 @@ public class FileLockNodeManager extends NodeManager {
private synchronized void startLockMonitoring() {
logger.debug("Starting the lock monitor");
if (monitorLock == null) {
- monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_MILLIES, LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS, false);
+ monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_NANOS, LOCK_MONITOR_TIMEOUT_NANOS, TimeUnit.NANOSECONDS, false);
monitorLock.start();
} else {
logger.debug("Lock monitor was already started");
@@ -431,49 +469,22 @@ public class FileLockNodeManager extends NodeManager {
}
}
- private void notifyLostLock() {
- // Additional check we are not initializing or have no locking object anymore
- // because of a shutdown
- if (lockListeners != null && liveLock != null) {
- Set<LockListener> lockListenersSnapshot = null;
-
- // Snapshot of the set because I'm not sure if we can trigger concurrent
- // modification exception here if we don't
- synchronized (lockListeners) {
- lockListenersSnapshot = new HashSet<>(lockListeners);
- }
-
- lockListenersSnapshot.forEach(lockListener -> {
- try {
- lockListener.lostLock();
- } catch (Exception e) {
- // Need to notify everyone so ignore any exception
- }
- });
+ @Override
+ protected synchronized void notifyLostLock() {
+ if (liveLock != null) {
+ super.notifyLostLock();
}
}
- public void registerLockListener(LockListener lockListener) {
- lockListeners.add(lockListener);
+ // This has been introduced to help ByteMan test testLockMonitorInvalid on JDK 11: sun.nio.ch.FileLockImpl::isValid
+ // can affecting setLive, causing an infinite loop due to java.nio.channels.OverlappingFileLockException on tryLock
+ private boolean isLiveLockLost() {
+ final FileLock lock = this.liveLock;
+ return (lock != null && !lock.isValid()) || lock == null;
}
- public void unregisterLockListener(LockListener lockListener) {
- lockListeners.remove(lockListener);
- }
-
- protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>());
-
private MonitorLock monitorLock;
- public abstract class LockListener {
- protected abstract void lostLock() throws Exception;
-
- protected void unregisterListener() {
- lockListeners.remove(this);
- }
- }
-
-
public class MonitorLock extends ActiveMQScheduledComponent {
public MonitorLock(ScheduledExecutorService scheduledExecutorService,
long initialDelay,
@@ -492,7 +503,7 @@ public class FileLockNodeManager extends NodeManager {
if (liveLock == null) {
logger.debug("Livelock is null");
}
- lostLock = (liveLock != null && !liveLock.isValid()) || liveLock == null;
+ lostLock = isLiveLockLost();
if (!lostLock) {
logger.debug("Server still has the lock, double check status is live");
// Java always thinks the lock is still valid even when there is no filesystem
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
index 7af25de..bbe4191 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
@@ -17,14 +17,10 @@
package org.apache.activemq.artemis.core.server.impl;
import java.io.File;
-import java.io.IOException;
import java.util.concurrent.Semaphore;
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State.FAILING_BACK;
@@ -39,7 +35,7 @@ import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State
* multiple servers are run inside the same VM and File Locks can not be shared in the
* same VM (it would cause a shared lock violation).
*/
-public final class InVMNodeManager extends NodeManager {
+public final class InVMNodeManager extends FileBasedNodeManager {
private final Semaphore liveLock;
@@ -67,7 +63,7 @@ public final class InVMNodeManager extends NodeManager {
}
@Override
- public void awaitLiveNode() throws Exception {
+ public void awaitLiveNode() throws InterruptedException {
do {
while (state == NOT_STARTED) {
Thread.sleep(10);
@@ -92,51 +88,47 @@ public final class InVMNodeManager extends NodeManager {
}
@Override
- public void awaitLiveStatus() throws Exception {
+ public void awaitLiveStatus() throws InterruptedException {
while (state != LIVE) {
Thread.sleep(10);
}
}
@Override
- public void startBackup() throws Exception {
+ public void startBackup() throws InterruptedException {
backupLock.acquire();
}
@Override
- public ActivateCallback startLiveNode() throws Exception {
+ public ActivateCallback startLiveNode() throws InterruptedException {
state = FAILING_BACK;
liveLock.acquire();
return new CleaningActivateCallback() {
@Override
public void activationComplete() {
- try {
- state = LIVE;
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- }
+ state = LIVE;
}
};
}
@Override
- public void pauseLiveServer() throws Exception {
+ public void pauseLiveServer() {
state = PAUSED;
liveLock.release();
}
@Override
- public void crashLiveServer() throws Exception {
+ public void crashLiveServer() {
liveLock.release();
}
@Override
- public boolean isAwaitingFailback() throws Exception {
+ public boolean isAwaitingFailback() {
return state == FAILING_BACK;
}
@Override
- public boolean isBackupLive() throws Exception {
+ public boolean isBackupLive() {
return liveLock.availablePermits() == 0;
}
@@ -151,7 +143,7 @@ public final class InVMNodeManager extends NodeManager {
}
@Override
- public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException {
+ public SimpleString readNodeId() {
return getNodeId();
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 38483d0..002414a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -67,7 +68,7 @@ public final class SharedNothingBackupActivation extends Activation {
private SharedNothingBackupQuorum backupQuorum;
private final boolean attemptFailBack;
private final Map<String, Object> activationParams;
- private final ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO;
+ private final IOCriticalErrorListener ioCriticalErrorListener;
private String nodeID;
ClusterControl clusterControl;
private boolean closed;
@@ -79,13 +80,13 @@ public final class SharedNothingBackupActivation extends Activation {
public SharedNothingBackupActivation(ActiveMQServerImpl activeMQServer,
boolean attemptFailBack,
Map<String, Object> activationParams,
- ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO,
+ IOCriticalErrorListener ioCriticalErrorListener,
ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck) {
this.activeMQServer = activeMQServer;
this.attemptFailBack = attemptFailBack;
this.activationParams = activationParams;
- this.shutdownOnCriticalIO = shutdownOnCriticalIO;
+ this.ioCriticalErrorListener = ioCriticalErrorListener;
this.replicaPolicy = replicaPolicy;
backupSyncLatch.setCount(1);
this.networkHealthCheck = networkHealthCheck;
@@ -95,7 +96,7 @@ public final class SharedNothingBackupActivation extends Activation {
assert replicationEndpoint == null;
activeMQServer.resetNodeManager();
backupUpToDate = false;
- replicationEndpoint = new ReplicationEndpoint(activeMQServer, shutdownOnCriticalIO, attemptFailBack, this);
+ replicationEndpoint = new ReplicationEndpoint(activeMQServer, ioCriticalErrorListener, attemptFailBack, this);
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 7178cd8..9de4be0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -317,7 +317,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
SimpleString nodeId0;
try {
nodeId0 = activeMQServer.getNodeManager().readNodeId();
- } catch (ActiveMQIllegalStateException e) {
+ } catch (NodeManager.NodeManagerException e) {
nodeId0 = null;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
index 463d97c..0b21445 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
@@ -16,18 +16,24 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
+import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
@@ -40,17 +46,24 @@ public final class SharedStoreBackupActivation extends Activation {
private static final Logger logger = Logger.getLogger(SharedStoreBackupActivation.class);
//this is how we act as a backup
- private SharedStoreSlavePolicy sharedStoreSlavePolicy;
+ private final SharedStoreSlavePolicy sharedStoreSlavePolicy;
- private ActiveMQServerImpl activeMQServer;
+ private final ActiveMQServerImpl activeMQServer;
private final Object failbackCheckerGuard = new Object();
private boolean cancelFailBackChecker;
- public SharedStoreBackupActivation(ActiveMQServerImpl server, SharedStoreSlavePolicy sharedStoreSlavePolicy) {
+ private LockListener activeLockListener;
+
+ private final IOCriticalErrorListener ioCriticalErrorListener;
+
+ public SharedStoreBackupActivation(ActiveMQServerImpl server,
+ SharedStoreSlavePolicy sharedStoreSlavePolicy,
+ IOCriticalErrorListener ioCriticalErrorListener) {
this.activeMQServer = server;
this.sharedStoreSlavePolicy = sharedStoreSlavePolicy;
+ this.ioCriticalErrorListener = ioCriticalErrorListener;
synchronized (failbackCheckerGuard) {
cancelFailBackChecker = false;
}
@@ -59,6 +72,8 @@ public final class SharedStoreBackupActivation extends Activation {
@Override
public void run() {
try {
+ registerActiveLockListener(activeMQServer.getNodeManager());
+
activeMQServer.getNodeManager().startBackup();
ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
@@ -92,6 +107,11 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.completeActivation(false);
if (scalingDown) {
+ if (!restarting.compareAndSet(false, true)) {
+ return;
+ }
+ unregisterActiveLockListener(activeMQServer.getNodeManager());
+
ActiveMQServerLogger.LOGGER.backupServerScaledDown();
Thread t = new Thread(new Runnable() {
@Override
@@ -117,6 +137,17 @@ public final class SharedStoreBackupActivation extends Activation {
if (sharedStoreSlavePolicy.isAllowAutoFailBack() && ActiveMQServerImpl.SERVER_STATE.STOPPING != activeMQServer.getState() && ActiveMQServerImpl.SERVER_STATE.STOPPED != activeMQServer.getState()) {
startFailbackChecker();
}
+ } catch (NodeManagerException nodeManagerException) {
+ if (nodeManagerException.getCause() instanceof ClosedChannelException) {
+ // this is ok, we are being stopped
+ return;
+ }
+ if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
+ ActiveMQServerLogger.LOGGER.initializationError(nodeManagerException.getCause());
+ return;
+ }
+ unregisterActiveLockListener(activeMQServer.getNodeManager());
+ ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
} catch (ClosedChannelException | InterruptedException e) {
// these are ok, we are being stopped
} catch (Exception e) {
@@ -144,16 +175,25 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.interruptActivationThread(nodeManagerInUse);
if (nodeManagerInUse != null) {
+ unregisterActiveLockListener(nodeManagerInUse);
nodeManagerInUse.stopBackup();
}
} else {
if (nodeManagerInUse != null) {
+ unregisterActiveLockListener(nodeManagerInUse);
// if we are now live, behave as live
// We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
// started before the live
if (sharedStoreSlavePolicy.isFailoverOnServerShutdown() || permanently) {
- nodeManagerInUse.crashLiveServer();
+ try {
+ nodeManagerInUse.crashLiveServer();
+ } catch (Throwable t) {
+ if (!permanently) {
+ throw t;
+ }
+ logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
+ }
} else {
nodeManagerInUse.pauseLiveServer();
}
@@ -161,6 +201,27 @@ public final class SharedStoreBackupActivation extends Activation {
}
}
+ private void registerActiveLockListener(NodeManager nodeManager) {
+ LockListener lockListener = () -> {
+ if (!restarting.compareAndSet(false, true)) {
+ logger.warn("Restarting already happening on lost lock");
+ return;
+ }
+ unregisterActiveLockListener(nodeManager);
+ ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
+ };
+ activeLockListener = lockListener;
+ nodeManager.registerLockListener(lockListener);
+ }
+
+ private void unregisterActiveLockListener(NodeManager nodeManager) {
+ LockListener activeLockListener = this.activeLockListener;
+ if (activeLockListener != null) {
+ nodeManager.unregisterLockListener(activeLockListener);
+ this.activeLockListener = null;
+ }
+ }
+
@Override
public JournalLoader createJournalLoader(PostOffice postOffice,
PagingManager pagingManager,
@@ -178,6 +239,8 @@ public final class SharedStoreBackupActivation extends Activation {
}
}
+ private final AtomicBoolean restarting = new AtomicBoolean(false);
+
/**
* To be called by backup trying to fail back the server
*/
@@ -195,47 +258,50 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener);
}
- private boolean restarting = false;
-
@Override
public void run() {
try {
- if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) {
- if (backupListener.waitForBackup()) {
- ActiveMQServerLogger.LOGGER.awaitFailBack();
- restarting = true;
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
+ if (!restarting.get() && activeMQServer.getNodeManager().isAwaitingFailback() && backupListener.waitForBackup()) {
+ if (!restarting.compareAndSet(false, true)) {
+ return;
+ }
+ ActiveMQServerLogger.LOGGER.awaitFailBack();
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ logger.debug(activeMQServer + "::Stopping live node in favor of failback");
+
+ NodeManager nodeManager = activeMQServer.getNodeManager();
+ activeMQServer.stop(true, false, true);
+
+ // ensure that the server to which we are failing back actually starts fully before we restart
+ nodeManager.start();
try {
- logger.debug(activeMQServer + "::Stopping live node in favor of failback");
-
- NodeManager nodeManager = activeMQServer.getNodeManager();
- activeMQServer.stop(true, false, true);
+ nodeManager.awaitLiveStatus();
+ } finally {
+ nodeManager.stop();
+ }
- // ensure that the server to which we are failing back actually starts fully before we restart
- nodeManager.start();
- try {
- nodeManager.awaitLiveStatus();
- } finally {
- nodeManager.stop();
- }
+ synchronized (failbackCheckerGuard) {
+ if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
+ return;
- synchronized (failbackCheckerGuard) {
- if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
- return;
+ activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
+ logger.debug(activeMQServer + "::Starting backup node now after failback");
+ activeMQServer.start();
- activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
- logger.debug(activeMQServer + "::Starting backup node now after failback");
- activeMQServer.start();
+ LockListener lockListener = activeLockListener;
+ if (lockListener != null) {
+ activeMQServer.getNodeManager().registerLockListener(lockListener);
}
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
}
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
}
- });
- t.start();
- }
+ }
+ });
+ t.start();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
index 6745ed9..1733f34 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
@@ -16,11 +16,17 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
+import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.jboss.logging.Logger;
public final class SharedStoreLiveActivation extends LiveActivation {
@@ -28,17 +34,22 @@ public final class SharedStoreLiveActivation extends LiveActivation {
private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class);
// this is how we act when we initially start as live
- private SharedStoreMasterPolicy sharedStoreMasterPolicy;
+ private final SharedStoreMasterPolicy sharedStoreMasterPolicy;
- private ActiveMQServerImpl activeMQServer;
+ private final ActiveMQServerImpl activeMQServer;
- private volatile FileLockNodeManager.LockListener activeLockListener;
+ private volatile LockListener activeLockListener;
private volatile ActivateCallback nodeManagerActivateCallback;
- public SharedStoreLiveActivation(ActiveMQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy) {
+ private final IOCriticalErrorListener ioCriticalErrorListener;
+
+ public SharedStoreLiveActivation(ActiveMQServerImpl server,
+ SharedStoreMasterPolicy sharedStoreMasterPolicy,
+ IOCriticalErrorListener ioCriticalErrorListener) {
this.activeMQServer = server;
this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
+ this.ioCriticalErrorListener = ioCriticalErrorListener;
}
@Override
@@ -71,9 +82,9 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.getBackupManager().announceBackup();
}
+ registerActiveLockListener(activeMQServer.getNodeManager());
nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode();
activeMQServer.registerActivateCallback(nodeManagerActivateCallback);
- addLockListener(activeMQServer, activeMQServer.getNodeManager());
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
|| activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
@@ -85,59 +96,40 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.completeActivation(false);
ActiveMQServerLogger.LOGGER.serverIsLive();
+ } catch (NodeManagerException nodeManagerException) {
+ if (nodeManagerException.getCause() instanceof ClosedChannelException) {
+ // this is ok, we are being stopped
+ return;
+ }
+ if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
+ onActivationFailure((ActiveMQLockAcquisitionTimeoutException) nodeManagerException.getCause());
+ return;
+ }
+ unregisterActiveLockListener(activeMQServer.getNodeManager());
+ ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
} catch (Exception e) {
- ActiveMQServerLogger.LOGGER.initializationError(e);
- activeMQServer.callActivationFailureListeners(e);
+ onActivationFailure(e);
}
}
- private void addLockListener(ActiveMQServerImpl activeMQServer, NodeManager nodeManager) {
- if (nodeManager instanceof FileLockNodeManager) {
- FileLockNodeManager fileNodeManager = (FileLockNodeManager) nodeManager;
-
- activeLockListener = fileNodeManager.new LockListener() {
-
- @Override
- public void lostLock() {
- stopStartServerInSeperateThread(activeMQServer);
- }
-
- };
- fileNodeManager.registerLockListener(activeLockListener);
- } // else no business registering a listener
+ private void onActivationFailure(Exception e) {
+ unregisterActiveLockListener(activeMQServer.getNodeManager());
+ ActiveMQServerLogger.LOGGER.initializationError(e);
+ activeMQServer.callActivationFailureListeners(e);
}
- /**
- * We need to do this in a new thread because this takes to long to finish in
- * the scheduled thread Also this is not the responsibility of the scheduled
- * thread
- * @param activeMQServer
- */
- private void stopStartServerInSeperateThread(ActiveMQServerImpl activeMQServer) {
- try {
-
- Runnable startServerRunnable = new Runnable() {
-
- @Override
- public void run() {
- try {
- activeMQServer.stop(true, false);
- } catch (Exception e) {
- logger.warn("Failed to stop artemis server after loosing the lock", e);
- }
-
- try {
- activeMQServer.start();
- } catch (Exception e) {
- logger.error("Failed to start artemis server after recovering from loosing the lock", e);
- }
- }
+ private void registerActiveLockListener(NodeManager nodeManager) {
+ LockListener lockListener = () ->
+ ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
+ activeLockListener = lockListener;
+ nodeManager.registerLockListener(lockListener);
+ }
- };
- Thread startServer = new Thread(startServerRunnable);
- startServer.start();
- } catch (Exception e) {
- logger.error(e.getMessage());
+ private void unregisterActiveLockListener(NodeManager nodeManager) {
+ LockListener activeLockListener = this.activeLockListener;
+ if (activeLockListener != null) {
+ nodeManager.unregisterLockListener(activeLockListener);
+ this.activeLockListener = null;
}
}
@@ -147,16 +139,20 @@ public final class SharedStoreLiveActivation extends LiveActivation {
NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
if (nodeManagerInUse != null) {
- LockListener closeLockListener = activeLockListener;
- if (closeLockListener != null) {
- closeLockListener.unregisterListener();
- }
+ unregisterActiveLockListener(nodeManagerInUse);
ActivateCallback activateCallback = nodeManagerActivateCallback;
if (activateCallback != null) {
activeMQServer.unregisterActivateCallback(activateCallback);
}
if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) {
- nodeManagerInUse.crashLiveServer();
+ try {
+ nodeManagerInUse.crashLiveServer();
+ } catch (Throwable t) {
+ if (!permanently) {
+ throw t;
+ }
+ logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
+ }
} else {
nodeManagerInUse.pauseLiveServer();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
index ce91d70..d1382cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
@@ -20,13 +20,13 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
/**
- * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}.
+ * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, LockListener)}.
*/
final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
@@ -36,14 +36,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
private final LeaseLock lock;
private long lastLockRenewStart;
private final long renewPeriodMillis;
- private final IOCriticalErrorListener ioCriticalErrorListener;
+ private final LockListener lockListener;
ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
- IOCriticalErrorListener ioCriticalErrorListener) {
+ LockListener lockListener) {
super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
if (renewPeriodMillis >= lock.expirationMillis()) {
throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
@@ -51,9 +51,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
this.lockName = lockName;
this.lock = lock;
this.renewPeriodMillis = renewPeriodMillis;
- //already expired start time
+ // already expired start time
this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
- this.ioCriticalErrorListener = ioCriticalErrorListener;
+ this.lockListener = lockListener;
+ }
+
+ @Override
+ public String lockName() {
+ return lockName;
}
@Override
@@ -84,37 +89,55 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
}
@Override
- public void run() {
+ public synchronized void run() {
+ if (!isStarted()) {
+ return;
+ }
final long lastRenewStart = this.lastLockRenewStart;
final long renewStart = System.nanoTime();
+ boolean lockLost = true;
try {
- if (!this.lock.renew()) {
- ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
- }
+ lockLost = !this.lock.renew();
} catch (Throwable t) {
- ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null);
- throw t;
+ LOGGER.warnf(t, "%s lock renew has failed", lockName);
+ if (lock.localExpirationTime() > 0) {
+ final long millisBeforeExpiration = (lock.localExpirationTime() - System.currentTimeMillis());
+ // there is enough time to retry to renew it?
+ if (millisBeforeExpiration >= this.renewPeriodMillis) {
+ lockLost = false;
+ }
+ }
+ }
+ // a failed attempt to renew is treated as a lost lock
+ if (lockLost) {
+ try {
+ lockListener.lostLock();
+ } catch (Throwable t) {
+ LOGGER.warnf(t, "Errored while notifying %s lock listener", lockName);
+ }
}
//logic to detect slowness of DB and/or the scheduled executor service
- detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
+ detectAndReportRenewSlowness(lockName, lockLost, lastRenewStart,
+ renewStart, renewPeriodMillis, lock.expirationMillis());
this.lastLockRenewStart = renewStart;
}
private static void detectAndReportRenewSlowness(String lockName,
+ boolean lostLock,
long lastRenewStart,
long renewStart,
long expectedRenewPeriodMillis,
long expirationMillis) {
final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
- LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms");
+ LOGGER.errorf("%s lock %s renew tooks %d ms, while is supposed to take <%d ms", lockName, lostLock ? "failed" : "successful", elapsedMillisToRenew, expectedRenewPeriodMillis);
}
final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
if (measuredRenewPeriodMillis - expirationMillis > 100) {
- LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+ LOGGER.errorf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
} else if (measuredRenewPeriodMillis - expectedRenewPeriodMillis > 100) {
- LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+ LOGGER.warnf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
index 3b7124e..f1c789c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
@@ -44,8 +44,10 @@ final class JdbcLeaseLock implements LeaseLock {
private final String isLocked;
private final String currentDateTime;
private final long expirationMillis;
+ private final int queryTimeout;
private boolean maybeAcquired;
private final String lockName;
+ private long localExpirationTime;
/**
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
@@ -59,6 +61,7 @@ final class JdbcLeaseLock implements LeaseLock {
String isLocked,
String currentDateTime,
long expirationMIllis,
+ long queryTimeoutMillis,
String lockName) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
@@ -73,12 +76,30 @@ final class JdbcLeaseLock implements LeaseLock {
this.maybeAcquired = false;
this.connectionProvider = connectionProvider;
this.lockName = lockName;
+ this.localExpirationTime = -1;
+ int expectedTimeout = (int) (queryTimeoutMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(queryTimeoutMillis) : -1);
+ if (queryTimeoutMillis >= 0) {
+ LOGGER.warn("queryTimeoutMillis is too low: it's suggested to configure a multi-seconds value. Disabling it because too low.");
+ expectedTimeout = -1;
+ }
+ this.queryTimeout = expectedTimeout;
+
}
public String holderId() {
return holderId;
}
+ /**
+ * Given that many DBMS won't support standard SQL queries to collect CURRENT_TIMESTAMP at milliseconds granularity,
+ * this value is stripped of the milliseconds part, making it less optimistic then the reality, if >= 0.<p>
+ * It's commonly used as an hard deadline for JDBC operations, hence is fine to not have a high precision.
+ */
+ @Override
+ public long localExpirationTime() {
+ return localExpirationTime;
+ }
+
@Override
public long expirationMillis() {
return expirationMillis;
@@ -115,17 +136,24 @@ final class JdbcLeaseLock implements LeaseLock {
}
private long dbCurrentTimeMillis(Connection connection) throws SQLException {
- final long start = System.nanoTime();
try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
+ if (queryTimeout >= 0) {
+ currentDateTime.setQueryTimeout(queryTimeout);
+ }
+ final long startTime = stripMilliseconds(System.currentTimeMillis());
try (ResultSet resultSet = currentDateTime.executeQuery()) {
resultSet.next();
+ final long endTime = stripMilliseconds(System.currentTimeMillis());
final Timestamp currentTimestamp = resultSet.getTimestamp(1);
- final long elapsedTime = System.nanoTime() - start;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
- lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
+ final long currentTime = currentTimestamp.getTime();
+ final long currentTimeMillis = stripMilliseconds(currentTime);
+ if (currentTimeMillis < startTime) {
+ LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen AFTER %s on broker", lockName, holderId, currentTimestamp, new Timestamp(startTime));
}
- return currentTimestamp.getTime();
+ if (currentTimeMillis > endTime) {
+ LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen BEFORE %s on broker", lockName, holderId, currentTimestamp, new Timestamp(endTime));
+ }
+ return currentTime;
}
}
}
@@ -138,7 +166,8 @@ final class JdbcLeaseLock implements LeaseLock {
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
final long now = dbCurrentTimeMillis(connection);
- final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+ final long localExpirationTime = now + expirationMillis;
+ final Timestamp expirationTime = new Timestamp(localExpirationTime);
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime);
@@ -151,11 +180,13 @@ final class JdbcLeaseLock implements LeaseLock {
final boolean renewed = updatedRows == 1;
connection.commit();
if (!renewed) {
+ this.localExpirationTime = -1;
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
+ this.localExpirationTime = stripMilliseconds(localExpirationTime);
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
}
return renewed;
@@ -170,6 +201,10 @@ final class JdbcLeaseLock implements LeaseLock {
}
}
+ private static long stripMilliseconds(long time) {
+ return (time / 1000) * 1000;
+ }
+
@Override
public boolean tryAcquire() {
try (Connection connection = connectionProvider.getConnection()) {
@@ -179,7 +214,8 @@ final class JdbcLeaseLock implements LeaseLock {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) {
final long now = dbCurrentTimeMillis(connection);
preparedStatement.setString(1, holderId);
- final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+ final long localExpirationTime = now + expirationMillis;
+ final Timestamp expirationTime = new Timestamp(localExpirationTime);
preparedStatement.setTimestamp(2, expirationTime);
preparedStatement.setTimestamp(3, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
@@ -188,6 +224,7 @@ final class JdbcLeaseLock implements LeaseLock {
connection.commit();
if (acquired) {
this.maybeAcquired = true;
+ this.localExpirationTime = stripMilliseconds(localExpirationTime);
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
} else {
if (LOGGER.isDebugEnabled()) {
@@ -272,6 +309,7 @@ final class JdbcLeaseLock implements LeaseLock {
preparedStatement.setString(1, holderId);
final boolean released = preparedStatement.executeUpdate() == 1;
//consider it as released to avoid on finalize to be reclaimed
+ this.localExpirationTime = -1;
this.maybeAcquired = false;
connection.commit();
if (!released) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
index 212e6e1..f357925 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -23,8 +23,8 @@ import java.util.function.Supplier;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
@@ -36,6 +36,8 @@ import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
+import static org.apache.activemq.artemis.core.server.impl.jdbc.LeaseLock.AcquireResult.Timeout;
+
/**
* JDBC implementation of {@link NodeManager}.
*/
@@ -53,12 +55,10 @@ public final class JdbcNodeManager extends NodeManager {
private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false;
private final LeaseLock.Pauser pauser;
- private final IOCriticalErrorListener ioCriticalErrorListener;
public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
ScheduledExecutorService scheduledExecutorService,
- ExecutorFactory executorFactory,
- IOCriticalErrorListener ioCriticalErrorListener) {
+ ExecutorFactory executorFactory) {
validateTimeoutConfiguration(configuration);
final SQLProvider.Factory sqlProviderFactory;
if (configuration.getSqlProviderFactory() != null) {
@@ -74,8 +74,7 @@ public final class JdbcNodeManager extends NodeManager {
configuration.getConnectionProvider(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
- executorFactory,
- ioCriticalErrorListener);
+ executorFactory);
}
private static JdbcNodeManager usingConnectionProvider(String brokerId,
@@ -85,18 +84,16 @@ public final class JdbcNodeManager extends NodeManager {
JDBCConnectionProvider connectionProvider,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
- ExecutorFactory executorFactory,
- IOCriticalErrorListener ioCriticalErrorListener) {
- return new JdbcNodeManager(
- () -> JdbcSharedStateManager.usingConnectionProvider(brokerId,
- lockExpirationMillis,
- connectionProvider,
- provider),
+ ExecutorFactory executorFactory) {
+ return new JdbcNodeManager(() -> JdbcSharedStateManager.usingConnectionProvider(brokerId, lockExpirationMillis,
+ lockRenewPeriodMillis,
+ connectionProvider,
+ provider),
+ lockExpirationMillis,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
- executorFactory,
- ioCriticalErrorListener);
+ executorFactory);
}
private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) {
@@ -122,12 +119,12 @@ public final class JdbcNodeManager extends NodeManager {
}
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
+ long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
ScheduledExecutorService scheduledExecutorService,
- ExecutorFactory executorFactory,
- IOCriticalErrorListener ioCriticalErrorListener) {
- super(false, null);
+ ExecutorFactory executorFactory) {
+ super(false);
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManagerFactory = sharedStateManagerFactory;
@@ -137,7 +134,7 @@ public final class JdbcNodeManager extends NodeManager {
"live",
this.sharedStateManager.liveLock(),
lockRenewPeriodMillis,
- ioCriticalErrorListener);
+ this::notifyLostLock);
this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService,
executorFactory != null ?
@@ -145,35 +142,47 @@ public final class JdbcNodeManager extends NodeManager {
"backup",
this.sharedStateManager.backupLock(),
lockRenewPeriodMillis,
- ioCriticalErrorListener);
- this.ioCriticalErrorListener = ioCriticalErrorListener;
+ this::notifyLostLock);
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
}
@Override
- public void start() throws Exception {
+ protected synchronized void notifyLostLock() {
try {
- synchronized (this) {
- if (isStarted()) {
- return;
- }
- this.sharedStateManager = sharedStateManagerFactory.get();
- LOGGER.debug("setup sharedStateManager on start");
- final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
- setUUID(nodeId);
- this.scheduledLiveLock = scheduledLiveLockFactory.get();
- this.scheduledBackupLock = scheduledBackupLockFactory.get();
- super.start();
+ super.notifyLostLock();
+ } finally {
+ // if any of the notified listener has stopped the node manager or
+ // the node manager was already stopped
+ if (!isStarted()) {
+ return;
+ }
+ try {
+ stop();
+ } catch (Exception ex) {
+ LOGGER.warn("Stopping node manager has errored on lost lock notification", ex);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ try {
+ if (isStarted()) {
+ return;
}
+ this.sharedStateManager = sharedStateManagerFactory.get();
+ LOGGER.debug("setup sharedStateManager on start");
+ final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
+ setUUID(nodeId);
+ this.scheduledLiveLock = scheduledLiveLockFactory.get();
+ this.scheduledBackupLock = scheduledBackupLockFactory.get();
+ super.start();
} catch (IllegalStateException e) {
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
- if (this.ioCriticalErrorListener != null) {
- this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
- }
throw e;
}
}
@@ -200,43 +209,34 @@ public final class JdbcNodeManager extends NodeManager {
}
@Override
- public boolean isAwaitingFailback() throws Exception {
+ public boolean isAwaitingFailback() throws NodeManagerException {
+ checkStarted();
LOGGER.debug("ENTER isAwaitingFailback");
try {
return readSharedState() == SharedStateManager.State.FAILING_BACK;
+ } catch (IllegalStateException e) {
+ LOGGER.warn("cannot retrieve the live state: assume it's not yet failed back", e);
+ return false;
} finally {
LOGGER.debug("EXIT isAwaitingFailback");
}
}
@Override
- public boolean isBackupLive() throws Exception {
+ public boolean isBackupLive() throws NodeManagerException {
+ checkStarted();
LOGGER.debug("ENTER isBackupLive");
try {
//is anyone holding the live lock?
return this.scheduledLiveLock.lock().isHeld();
+ } catch (IllegalStateException e) {
+ throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT isBackupLive");
}
}
@Override
- public void stopBackup() throws Exception {
- LOGGER.debug("ENTER stopBackup");
- try {
- if (this.scheduledBackupLock.isStarted()) {
- LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
- this.scheduledBackupLock.stop();
- this.scheduledBackupLock.lock().release();
- } else {
- LOGGER.debug("scheduledBackupLock is not running");
- }
- } finally {
- LOGGER.debug("EXIT stopBackup");
- }
- }
-
- @Override
public void interrupt() {
LOGGER.debug("ENTER interrupted");
//need to be volatile: must be called concurrently to work as expected
@@ -245,7 +245,8 @@ public final class JdbcNodeManager extends NodeManager {
}
@Override
- public void releaseBackup() throws Exception {
+ public void releaseBackup() throws NodeManagerException {
+ checkStarted();
LOGGER.debug("ENTER releaseBackup");
try {
if (this.scheduledBackupLock.isStarted()) {
@@ -255,19 +256,47 @@ public final class JdbcNodeManager extends NodeManager {
} else {
LOGGER.debug("scheduledBackupLock is not running");
}
+ } catch (IllegalStateException e) {
+ throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT releaseBackup");
}
}
/**
- * Try to acquire a lock, failing with an exception otherwise.
+ * Try to acquire a lock
*/
- private void lock(LeaseLock lock) throws Exception {
- final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted);
+ private void lock(LeaseLock lock) throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
+ final long lockAcquisitionTimeoutNanos = lockAcquisitionTimeoutMillis >= 0 ?
+ TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeoutMillis) : -1;
+ LeaseLock.AcquireResult acquireResult = null;
+ final long start = System.nanoTime();
+ while (acquireResult == null) {
+ checkStarted();
+ // measure distance from the timeout
+ final long remainingNanos = remainingNanos(start, lockAcquisitionTimeoutNanos);
+ if (remainingNanos == 0) {
+ acquireResult = Timeout;
+ continue;
+ }
+ final long remainingMillis = remainingNanos > 0 ? TimeUnit.NANOSECONDS.toMillis(remainingNanos) : -1;
+ try {
+ acquireResult = lock.tryAcquire(remainingMillis, this.pauser, () -> !this.interrupted);
+ } catch (IllegalStateException e) {
+ LOGGER.warn("Errored while trying to acquire lock", e);
+ if (remainingNanos(start, lockAcquisitionTimeoutNanos) == 0) {
+ acquireResult = Timeout;
+ continue;
+ }
+ // that's not precise, but it's ok: it can trigger the timeout right after the pause,
+ // depending by the pause length. The sole purpose of the pause is to save
+ // hammering with requests the DBMS if the connection is down
+ this.pauser.idle();
+ }
+ }
switch (acquireResult) {
case Timeout:
- throw new Exception("timed out waiting for lock");
+ throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
case Exit:
this.interrupted = false;
throw new InterruptedException("LeaseLock was interrupted");
@@ -279,6 +308,20 @@ public final class JdbcNodeManager extends NodeManager {
}
+ private static long remainingNanos(long start, long timeoutNanos) {
+ if (timeoutNanos > 0) {
+ final long elapsedNanos = (System.nanoTime() - start);
+ if (elapsedNanos < timeoutNanos) {
+ return timeoutNanos - elapsedNanos;
+ } else {
+ return 0;
+ }
+ } else {
+ assert timeoutNanos == -1;
+ return -1;
+ }
+ }
+
private void checkInterrupted(Supplier<String> message) throws InterruptedException {
if (this.interrupted) {
interrupted = false;
@@ -286,52 +329,77 @@ public final class JdbcNodeManager extends NodeManager {
}
}
- private void renewLiveLockIfNeeded(final long acquiredOn) {
- final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn);
- if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
- if (!this.scheduledLiveLock.lock().renew()) {
- final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
- ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
- throw e;
+ private void renewLock(ScheduledLeaseLock lock) {
+ boolean lostLock = true;
+ IllegalStateException renewEx = null;
+ try {
+ lostLock = !this.scheduledLiveLock.lock().renew();
+ } catch (IllegalStateException e) {
+ renewEx = e;
+ }
+ if (lostLock) {
+ notifyLostLock();
+ if (renewEx == null) {
+ renewEx = new IllegalStateException(lock.lockName() + " lock isn't renewed");
}
+ throw renewEx;
}
}
/**
* Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
*/
- private boolean lockLiveAndCheckLiveState() throws Exception {
- lock(this.scheduledLiveLock.lock());
- final long acquiredOn = System.nanoTime();
- boolean liveWhileLocked = false;
- //check if the state is live
- final SharedStateManager.State stateWhileLocked;
+ private boolean lockLiveAndCheckLiveState() throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
try {
- stateWhileLocked = readSharedState();
- } catch (Throwable t) {
- LOGGER.error("error while holding the live node lock and tried to read the shared state", t);
- this.scheduledLiveLock.lock().release();
- throw t;
- }
- if (stateWhileLocked == SharedStateManager.State.LIVE) {
- renewLiveLockIfNeeded(acquiredOn);
- liveWhileLocked = true;
- } else {
- LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked);
- //state is not live: can (try to) release the lock
- this.scheduledLiveLock.lock().release();
+ lock(this.scheduledLiveLock.lock());
+ //check if the state is live
+ while (true) {
+ try {
+ final SharedStateManager.State stateWhileLocked = readSharedState();
+ final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
+ if (System.currentTimeMillis() > localExpirationTime) {
+ // the lock can be assumed to be expired,
+ // so the state isn't worthy to be considered
+ return false;
+ }
+ if (stateWhileLocked == SharedStateManager.State.LIVE) {
+ // TODO need some tolerance//renew here?
+ return true;
+ } else {
+ // state is not live: can (try to) release the lock
+ this.scheduledLiveLock.lock().release();
+ return false;
+ }
+ } catch (IllegalStateException e) {
+ LOGGER.error("error while holding the live node lock and tried to read the shared state or to release the lock", e);
+ checkStarted();
+ checkInterrupted(() -> "interrupt on error while checking live state");
+ pauser.idle();
+ final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
+ if (System.currentTimeMillis() > localExpirationTime) {
+ return false;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ throw e;
}
- return liveWhileLocked;
}
@Override
- public void awaitLiveNode() throws Exception {
+ public void awaitLiveNode() throws NodeManagerException, InterruptedException {
+ checkStarted();
LOGGER.debug("ENTER awaitLiveNode");
try {
boolean liveWhileLocked = false;
while (!liveWhileLocked) {
//check first without holding any lock
- final SharedStateManager.State state = readSharedState();
+ SharedStateManager.State state = null;
+ try {
+ state = readSharedState();
+ } catch (IllegalStateException e) {
+ LOGGER.warn("Errored while reading shared state", e);
+ }
if (state == SharedStateManager.State.LIVE) {
//verify if the state is live while holding the live node lock too
liveWhileLocked = lockLiveAndCheckLiveState();
@@ -339,6 +407,7 @@ public final class JdbcNodeManager extends NodeManager {
LOGGER.debugf("state while awaiting live node: %s", state);
}
if (!liveWhileLocked) {
+ checkStarted();
checkInterrupted(() -> "awaitLiveNode got interrupted!");
pauser.idle();
}
@@ -346,32 +415,51 @@ public final class JdbcNodeManager extends NodeManager {
//state is LIVE and live lock is acquired and valid
LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE);
this.scheduledLiveLock.start();
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
+ throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT awaitLiveNode");
}
}
@Override
- public void startBackup() throws Exception {
+ public void startBackup() throws NodeManagerException, InterruptedException {
+ checkStarted();
LOGGER.debug("ENTER startBackup");
try {
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
-
lock(scheduledBackupLock.lock());
scheduledBackupLock.start();
ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null)
readNodeId();
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
+ throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT startBackup");
}
}
@Override
- public ActivateCallback startLiveNode() throws Exception {
+ public ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException {
+ checkStarted();
LOGGER.debug("ENTER startLiveNode");
try {
- setFailingBack();
+ boolean done = false;
+ while (!done) {
+ try {
+ setFailingBack();
+ done = true;
+ } catch (IllegalStateException e) {
+ LOGGER.warn("cannot set failing back state, retry", e);
+ pauser.idle();
+ checkInterrupted(() -> "interrupt while trying to set failing back state");
+ }
+ }
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
@@ -389,21 +477,42 @@ public final class JdbcNodeManager extends NodeManager {
LOGGER.debug("ENTER activationComplete");
try {
//state can be written only if the live renew task is running
- setLive();
- } catch (Exception e) {
+ boolean done = false;
+ while (!done) {
+ try {
+ setLive();
+ done = true;
+ } catch (IllegalStateException e) {
+ LOGGER.warn("Errored while trying to setLive", e);
+ checkStarted();
+ pauser.idle();
+ final long localExpirationTime = scheduledLiveLock.lock().localExpirationTime();
+ // optimistic: is just to set a deadline while retrying
+ if (System.currentTimeMillis() > localExpirationTime) {
+ throw new IllegalStateException("live lock is probably expired: failed to setLive");
+ }
+ }
+ }
+ } catch (IllegalStateException e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT activationComplete");
}
}
};
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
+ throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT startLiveNode");
}
}
@Override
- public void pauseLiveServer() throws Exception {
+ public void pauseLiveServer() throws NodeManagerException {
+ checkStarted();
LOGGER.debug("ENTER pauseLiveServer");
try {
if (scheduledLiveLock.isStarted()) {
@@ -413,23 +522,21 @@ public final class JdbcNodeManager extends NodeManager {
scheduledLiveLock.lock().release();
} else {
LOGGER.debug("scheduledLiveLock is not running: try renew live lock");
- if (scheduledLiveLock.lock().renew()) {
- LOGGER.debug("live lock renewed: set paused shared state and release live lock");
- setPaused();
- scheduledLiveLock.lock().release();
- } else {
- final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
- ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
- throw e;
- }
+ renewLock(scheduledLiveLock);
+ LOGGER.debug("live lock renewed: set paused shared state and release live lock");
+ setPaused();
+ scheduledLiveLock.lock().release();
}
+ } catch (IllegalStateException e) {
+ throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT pauseLiveServer");
}
}
@Override
- public void crashLiveServer() throws Exception {
+ public void crashLiveServer() throws NodeManagerException {
+ checkStarted();
LOGGER.debug("ENTER crashLiveServer");
try {
if (this.scheduledLiveLock.isStarted()) {
@@ -446,10 +553,18 @@ public final class JdbcNodeManager extends NodeManager {
@Override
public void awaitLiveStatus() {
+ checkStarted();
LOGGER.debug("ENTER awaitLiveStatus");
try {
- while (readSharedState() != SharedStateManager.State.LIVE) {
+ SharedStateManager.State state = null;
+ while (state != SharedStateManager.State.LIVE) {
+ try {
+ state = readSharedState();
+ } catch (IllegalStateException e) {
+ LOGGER.warn("Errored while trying to read shared state", e);
+ }
pauser.idle();
+ checkStarted();
}
} finally {
LOGGER.debug("EXIT awaitLiveStatus");
@@ -481,6 +596,7 @@ public final class JdbcNodeManager extends NodeManager {
@Override
public SimpleString readNodeId() {
+ checkStarted();
final UUID nodeId = this.sharedStateManager.readNodeId();
LOGGER.debugf("readNodeId nodeId = %s", nodeId);
setUUID(nodeId);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
index 06f7e2a..df43c6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
@@ -39,6 +39,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private static final int MAX_SETUP_ATTEMPTS = 20;
private final String holderId;
private final long lockExpirationMillis;
+ private final long queryTimeoutMillis;
private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock;
private String readNodeId;
@@ -48,10 +49,19 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private String writeState;
public static JdbcSharedStateManager usingConnectionProvider(String holderId,
- long locksExpirationMillis,
- JDBCConnectionProvider connectionProvider,
- SQLProvider provider) {
- final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
+ long locksExpirationMillis,
+ JDBCConnectionProvider connectionProvider,
+ SQLProvider provider) {
+ return usingConnectionProvider(holderId, locksExpirationMillis, -1, connectionProvider, provider);
+ }
+
+ public static JdbcSharedStateManager usingConnectionProvider(String holderId,
+ long locksExpirationMillis,
+ long queryTimeoutMillis,
+ JDBCConnectionProvider connectionProvider,
+ SQLProvider provider) {
+ final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis,
+ queryTimeoutMillis);
sharedStateManager.setJdbcConnectionProvider(connectionProvider);
sharedStateManager.setSqlProvider(provider);
try {
@@ -76,20 +86,35 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
long expirationMillis) {
- return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE");
+ return createLiveLock(holderId, connectionProvider, sqlProvider, expirationMillis, -1);
+ }
+
+ static JdbcLeaseLock createLiveLock(String holderId,
+ JDBCConnectionProvider connectionProvider,
+ SQLProvider sqlProvider,
+ long expirationMillis,
+ long queryTimeoutMillis) {
+ return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(),
+ sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(),
+ sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(),
+ expirationMillis, queryTimeoutMillis, "LIVE");
}
static JdbcLeaseLock createBackupLock(String holderId,
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
- long expirationMillis) {
- return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP");
+ long expirationMillis,
+ long queryTimeoutMillis) {
+ return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(),
+ sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(),
+ sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(),
+ expirationMillis, queryTimeoutMillis, "BACKUP");
}
@Override
protected void prepareStatements() {
- this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
- this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
+ this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
+ this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
this.readNodeId = sqlProvider.readNodeIdSQL();
this.writeNodeId = sqlProvider.writeNodeIdSQL();
this.initializeNodeId = sqlProvider.initializeNodeIdSQL();
@@ -97,9 +122,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
this.readState = sqlProvider.readStateSQL();
}
- private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
+ private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long queryTimeoutMillis) {
this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis;
+ this.queryTimeoutMillis = queryTimeoutMillis;
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
index 8deda12..8f2de11 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
@@ -41,6 +41,8 @@ interface LeaseLock extends AutoCloseable {
boolean keepRunning();
}
+ long localExpirationTime();
+
interface Pauser {
void idle();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
index 43751f8..8590fc6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
/**
@@ -28,17 +28,25 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
*/
interface ScheduledLeaseLock extends ActiveMQComponent {
+ @Override
+ void start();
+
+ @Override
+ void stop();
+
LeaseLock lock();
long renewPeriodMillis();
+ String lockName();
+
static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
- IOCriticalErrorListener ioCriticalErrorListener) {
- return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
+ LockListener lockListener) {
+ return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, lockListener);
}
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
index 77f488e..03febc2 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
@@ -21,15 +21,23 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -38,6 +46,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.core.Is.is;
+
@RunWith(Parameterized.class)
public class JdbcLeaseLockTest extends ActiveMQTestBase {
@@ -183,7 +194,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldAcquireExpiredLock() throws InterruptedException {
- final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+ final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
@@ -197,13 +208,13 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldOtherAcquireExpiredLock() throws InterruptedException {
- final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+ final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
- final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
+ final LeaseLock otherLock = lock(10);
try {
Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
} finally {
@@ -237,7 +248,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException {
- final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+ final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
@@ -251,7 +262,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException {
- final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+ final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
@@ -268,5 +279,97 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
lock.release();
}
}
+
+ @Test
+ public void shouldNotNotifyLostLock() throws Exception {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
+ final ArtemisExecutor artemisExecutor = factory.getExecutor();
+ final AtomicLong lostLock = new AtomicLong();
+ final LockListener lockListener = () -> {
+ lostLock.incrementAndGet();
+ };
+ final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
+ .of(scheduledExecutorService, artemisExecutor,
+ "test", lock(), dbConf.getJdbcLockRenewPeriodMillis(), lockListener);
+
+ Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
+ scheduledLeaseLock.start();
+ Assert.assertEquals(0, lostLock.get());
+ scheduledLeaseLock.stop();
+ Assert.assertEquals(0, lostLock.get());
+ executorService.shutdown();
+ scheduledExecutorService.shutdown();
+ scheduledLeaseLock.lock().release();
+ }
+
+
+ @Test
+ public void shouldNotifyManyTimesLostLock() throws Exception {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
+ final ArtemisExecutor artemisExecutor = factory.getExecutor();
+ final AtomicLong lostLock = new AtomicLong();
+ final LockListener lockListener = () -> {
+ lostLock.incrementAndGet();
+ };
+ final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
+ .of(scheduledExecutorService, artemisExecutor,
+ "test", lock(TimeUnit.SECONDS.toMillis(1)), 100, lockListener);
+
+ Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
+ scheduledLeaseLock.start();
+ // should let the renew to happen at least 1 time, excluding the time to start a scheduled task
+ TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
+ Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
+ Assert.assertEquals(0, lostLock.get());
+ scheduledLeaseLock.lock().release();
+ Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
+ TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
+ Assert.assertThat(lostLock.get(), is(greaterThanOrEqualTo(2L)));
+ scheduledLeaseLock.stop();
+ executorService.shutdown();
+ scheduledExecutorService.shutdown();
+ }
+
+ @Test
+ public void shouldNotifyOnceLostLockIfStopped() throws Exception {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
+ final ArtemisExecutor artemisExecutor = factory.getExecutor();
+ final AtomicLong lostLock = new AtomicLong();
+ final AtomicReference<ScheduledLeaseLock> lock = new AtomicReference<>();
+ final AtomicReference<Throwable> stopErrors = new AtomicReference<>();
+ final LockListener lockListener = () -> {
+ lostLock.incrementAndGet();
+ try {
+ lock.get().stop();
+ } catch (Throwable e) {
+ stopErrors.set(e);
+ }
+ };
+ final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
+ .of(scheduledExecutorService, artemisExecutor, "test", lock(TimeUnit.SECONDS.toMillis(1)),
+ 100, lockListener);
+ lock.set(scheduledLeaseLock);
+ Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
+ lostLock.set(0);
+ scheduledLeaseLock.start();
+ Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
+ scheduledLeaseLock.lock().release();
+ Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
+ Wait.assertTrue(() -> lostLock.get() > 0);
+ Assert.assertFalse(scheduledLeaseLock.isStarted());
+ // wait enough to see if it get triggered again
+ TimeUnit.MILLISECONDS.sleep(scheduledLeaseLock.renewPeriodMillis());
+ Assert.assertEquals(1, lostLock.getAndSet(0));
+ Assert.assertNull(stopErrors.getAndSet(null));
+ scheduledLeaseLock.stop();
+ executorService.shutdown();
+ scheduledExecutorService.shutdown();
+ }
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java
index 762e051..47e7660 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -105,17 +104,9 @@ public class JdbcNodeManagerTest extends ActiveMQTestBase {
@Test
public void shouldStartAndStopGracefullyTest() throws Exception {
- final AtomicReference<String> criticalError = new AtomicReference<>();
- final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null, (code, message, file) -> criticalError.lazySet(message));
- try {
- nodeManager.start();
- } finally {
- nodeManager.stop();
- final String error = criticalError.get();
- if (error != null) {
- Assert.fail(error);
- }
- }
+ final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null);
+ nodeManager.start();
+ nodeManager.stop();
}
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index be6d242..3f5436a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -522,6 +522,7 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
+ dbStorageConfiguration.setJdbcNetworkTimeout(-1);
return dbStorageConfiguration;
}
@@ -530,11 +531,11 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected long getJdbcLockExpirationMillis() {
- return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis());
+ return Long.getLong("jdbc.lock.expiration", 4_000);
}
protected long getJdbcLockRenewPeriodMillis() {
- return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
+ return Long.getLong("jdbc.lock.renew", 200);
}
public void destroyTables(List<String> tableNames) throws Exception {
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java
index 548ec1d..2aad98c 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java
@@ -1,120 +1,124 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.extras.byteman;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import org.apache.activemq.artemis.core.server.ActivateCallback;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
-import org.apache.activemq.artemis.utils.Wait;
-import org.jboss.byteman.contrib.bmunit.BMRule;
-import org.jboss.byteman.contrib.bmunit.BMRules;
-import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-@RunWith(BMUnitRunner.class)
-public class FileLockMonitorTest {
-
- private File sharedDir;
- private volatile boolean lostLock = false;
- private volatile FileLockNodeManager nodeManager;
- private ScheduledThreadPoolExecutor executor;
-
- @Before
- public void handleLockFile() throws IOException {
- sharedDir = File.createTempFile("shared-dir", "");
- sharedDir.delete();
- Assert.assertTrue(sharedDir.mkdir());
- lostLock = false;
- }
-
- @Test
- @BMRules(rules = {
- @BMRule(name = "lock is invalid", targetClass = "sun.nio.ch.FileLockImpl", targetMethod = "isValid", action = "return false;") })
- public void testLockMonitorInvalid() throws Exception {
- lostLock = false;
- startServer();
- Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
- nodeManager.isStarted();
- nodeManager.crashLiveServer();
- executor.shutdown();
- }
-
- @Test
- @BMRules(rules = {
- @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "throw new java.io.IOException(\"EFS is disconnected\");") })
- public void testLockMonitorIOException() throws Exception {
- lostLock = false;
- startServer();
- Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
- nodeManager.crashLiveServer();
- executor.shutdown();
- }
-
- @Test
- public void testLockMonitorHasCorrectLockAndState() throws Exception {
- lostLock = false;
- startServer();
- Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
- nodeManager.crashLiveServer();
- executor.shutdown();
- }
-
- @Test
- @BMRules(rules = {
- @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
- public void testLockMonitorHasLockWrongState() throws Exception {
- lostLock = false;
- startServer();
- Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
- nodeManager.crashLiveServer();
- executor.shutdown();
- }
-
- public LockListener startServer() throws Exception {
- executor = new ScheduledThreadPoolExecutor(2);
- nodeManager = new FileLockNodeManager(sharedDir, false, executor);
- LockListener listener = nodeManager.new LockListener() {
-
- @Override
- protected void lostLock() throws Exception {
- lostLock = true;
- nodeManager.crashLiveServer();
- }
-
- };
- nodeManager.registerLockListener(listener);
-
- try {
- nodeManager.start();
- ActivateCallback startLiveNode = nodeManager.startLiveNode();
- startLiveNode.activationComplete();
-
- } catch (Exception exception) {
- exception.printStackTrace();
- }
-
- return listener;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.io.File;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
+import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
+import org.apache.activemq.artemis.utils.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class FileLockMonitorTest {
+
+ private File sharedDir;
+ private volatile boolean lostLock = false;
+ private volatile FileLockNodeManager nodeManager;
+ private ScheduledThreadPoolExecutor executor;
+
+ @Before
+ public void handleLockFile() throws Exception {
+ sharedDir = File.createTempFile("shared-dir", "");
+ sharedDir.delete();
+ Assert.assertTrue(sharedDir.mkdir());
+ }
+
+ @Test
+ @BMRules(rules = {
+ @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "isLiveLockLost", action = "return true;") })
+ public void testLockMonitorInvalid() throws Exception {
+ lostLock = false;
+ startServer();
+ Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 20_000, 100);
+ nodeManager.isStarted();
+ nodeManager.crashLiveServer();
+ executor.shutdown();
+ }
+
+ public static void throwNodeManagerException(String msg) {
+ throw new NodeManager.NodeManagerException(msg);
+ }
+
+ @Test
+ @BMRules(rules = {
+ @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
+ targetMethod = "getState",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.FileLockMonitorTest.throwNodeManagerException(\"EFS is disconnected\");") })
+ public void testLockMonitorIOException() throws Exception {
+ lostLock = false;
+ startServer();
+ Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
+ nodeManager.crashLiveServer();
+ executor.shutdown();
+ }
+
+ @Test
+ public void testLockMonitorHasCorrectLockAndState() throws Exception {
+ lostLock = false;
+ startServer();
+ Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
+ nodeManager.crashLiveServer();
+ executor.shutdown();
+ }
+
+ @Test
+ @BMRules(rules = {
+ @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
+ public void testLockMonitorHasLockWrongState() throws Exception {
+ lostLock = false;
+ startServer();
+ Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
+ nodeManager.crashLiveServer();
+ executor.shutdown();
+ }
+
+ public LockListener startServer() throws Exception {
+ executor = new ScheduledThreadPoolExecutor(2);
+ nodeManager = new FileLockNodeManager(sharedDir, false, executor);
+ LockListener listener = () -> {
+ lostLock = true;
+ try {
+ nodeManager.crashLiveServer();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ };
+ nodeManager.registerLockListener(listener);
+
+ try {
+ nodeManager.start();
+ ActivateCallback startLiveNode = nodeManager.startLiveNode();
+ startLiveNode.activationComplete();
+
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+
+ return listener;
+ }
+}
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java
index 519cb63..148755f 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java
@@ -78,7 +78,7 @@ public class FileLockNodeManagerTest {
manager.awaitLiveNode();
} catch (Exception e) {
long stop = System.currentTimeMillis();
- if (!"timed out waiting for lock".equals(e.getMessage())) {
+ if (!"timed out waiting for lock".equals(e.getCause().getMessage())) {
throw e;
}
return stop - start;
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java
deleted file mode 100644
index ae763c1..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.byteman;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
-import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
-import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
-import org.apache.activemq.artemis.core.server.NodeManager;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
-import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
-import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
-import org.jboss.byteman.contrib.bmunit.BMRule;
-import org.jboss.byteman.contrib.bmunit.BMRules;
-import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.jboss.logging.Logger;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-@RunWith(BMUnitRunner.class)
-public class SharedStoreBackupActivationTest extends FailoverTestBase {
-
- private static Logger logger = Logger.getLogger(SharedStoreBackupActivationTest.class);
-
- private static volatile boolean throwException = false;
- private static CountDownLatch exceptionThrownLatch;
-
- public static synchronized boolean isThrowException() {
- logger.debugf("Throwing IOException during FileLockNodeManager.tryLock(): %s", throwException);
- if (exceptionThrownLatch != null) {
- exceptionThrownLatch.countDown();
- }
- return throwException;
- }
-
- /**
- * Waits for the backup server to call FileLockNodeManager.tryLock().
- */
- public static void awaitTryLock(boolean throwException) throws InterruptedException {
- synchronized (SharedStoreBackupActivationTest.class) {
- SharedStoreBackupActivationTest.throwException = throwException;
- exceptionThrownLatch = new CountDownLatch(1);
- }
- logger.debugf("Awaiting backup to perform FileLockNodeManager.tryLock()");
- boolean ret = exceptionThrownLatch.await(10, TimeUnit.SECONDS);
- SharedStoreBackupActivationTest.throwException = false;
-
- Assert.assertTrue("FileLockNodeManager.tryLock() was not called during specified timeout", ret);
- logger.debugf("Awaiting FileLockNodeManager.tryLock() done");
- }
-
- @Test
- @BMRules(
- rules = {@BMRule(
- name = "throw IOException during activation",
- targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
- targetMethod = "tryLock",
- targetLocation = "AT ENTRY",
- condition = "org.apache.activemq.artemis.tests.extras.byteman.SharedStoreBackupActivationTest.isThrowException()",
- action = "THROW new IOException(\"IO Error\");")
- })
- public void testFailOverAfterTryLockException() throws Exception {
- Assert.assertTrue(liveServer.isActive());
- Assert.assertFalse(backupServer.isActive());
-
- // wait for backup to try to acquire lock, once without exception (acquiring will not succeed because live is
- // still active)
- awaitTryLock(false);
-
- // wait for backup to try to acquire lock, this time throw an IOException
- logger.debug("Causing exception");
- awaitTryLock(true);
-
- // stop live server
- logger.debugf("Stopping live server");
- liveServer.stop();
- waitForServerToStop(liveServer.getServer());
- logger.debugf("Live server stopped, waiting for backup activation");
- backupServer.getServer().waitForActivation(10, TimeUnit.SECONDS);
-
- // backup should be activated by now
- Assert.assertFalse(liveServer.isActive());
- Assert.assertTrue("Backup server didn't activate", backupServer.isActive());
- }
-
- @Override
- protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
- return TransportConfigurationUtils.getInVMAcceptor(live);
- }
-
- @Override
- protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
- return TransportConfigurationUtils.getInVMConnector(live);
- }
-
- @Override
- protected void createConfigs() throws Exception {
- File sharedDir = File.createTempFile("shared-dir", "");
- sharedDir.delete();
- Assert.assertTrue(sharedDir.mkdir());
- logger.debugf("Created shared store directory %s", sharedDir.getCanonicalPath());
-
- TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
- TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
-
- // nodes must use separate FileLockNodeManager instances!
- NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
- NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
-
- backupConfig = super.createDefaultConfig(false)
- .clearAcceptorConfigurations()
- .addAcceptorConfiguration(getAcceptorTransportConfiguration(false))
- .setHAPolicyConfiguration(
- new SharedStoreSlavePolicyConfiguration()
- .setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false))
- .setRestartBackup(false))
- .addConnectorConfiguration(liveConnector.getName(), liveConnector)
- .addConnectorConfiguration(backupConnector.getName(), backupConnector)
- .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
- backupServer = createTestableServer(backupConfig, backupNodeManager);
-
- liveConfig = super.createDefaultConfig(false)
- .clearAcceptorConfigurations()
- .addAcceptorConfiguration(getAcceptorTransportConfiguration(true))
- .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true))
- .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName()))
- .addConnectorConfiguration(liveConnector.getName(), liveConnector);
- liveServer = createTestableServer(liveConfig, liveNodeManager);
- }
-
-}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java
similarity index 66%
rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java
rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java
index 1dfe48b..be2dc7d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java
@@ -18,34 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
-import org.apache.activemq.artemis.utils.UUID;
import org.junit.Assert;
-import org.junit.Test;
-public class RealNodeManagerTest extends NodeManagerTest {
-
- @Test
- public void testId() throws Exception {
- NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false);
- nodeManager.start();
- UUID id1 = nodeManager.getUUID();
- nodeManager.stop();
- nodeManager.start();
- ActiveMQTestBase.assertEqualsByteArrays(id1.asBytes(), nodeManager.getUUID().asBytes());
- nodeManager.stop();
- }
+public class FileLockNodeManagerTest extends NodeManagerTest {
@Override
public void performWork(NodeManagerAction... actions) throws Exception {
List<Process> processes = new ArrayList<>();
for (NodeManagerAction action : actions) {
- Process p = SpawnedVMSupport.spawnVM(NodeManagerAction.class.getName(), "-Xms512m", "-Xmx512m", new String[0], true, true, true, action.getWork());
+ final String[] args = new String[action.works() + 1];
+ args[0] = getTemporaryDir();
+ action.getWork(args, 1);
+ Process p = SpawnedVMSupport.spawnVM(this.getClass().getName(), "-Xms50m", "-Xmx512m", new String[0], true, true, true, args);
processes.add(p);
}
for (Process process : processes) {
@@ -58,4 +47,8 @@ public class RealNodeManagerTest extends NodeManagerTest {
}
}
+
+ public static void main(String[] args) throws Exception {
+ NodeManagerAction.execute(Arrays.copyOfRange(args, 1, args.length), new FileLockNodeManager(new File(args[0]), false));
+ }
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java
new file mode 100644
index 0000000..8c0ea8e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.Assert;
+
+public class JdbcNodeManagerTest extends NodeManagerTest {
+
+ @Override
+ public void performWork(NodeManagerAction... actions) throws Exception {
+ List<NodeRunner> nodeRunners = new ArrayList<>();
+ final ThreadFactory daemonThreadFactory = t -> {
+ final Thread th = new Thread(t);
+ th.setDaemon(true);
+ return th;
+ };
+ Thread[] threads = new Thread[actions.length];
+ List<ExecutorService> executors = new ArrayList<>(actions.length);
+ List<NodeManager> nodeManagers = new ArrayList<>(actions.length * 2);
+ AtomicBoolean failedRenew = new AtomicBoolean(false);
+ for (NodeManagerAction action : actions) {
+ final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
+ final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
+ final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
+ final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+ JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
+ nodeManager.start();
+ NodeRunner nodeRunner = new NodeRunner(nodeManager, action);
+ nodeRunners.add(nodeRunner);
+ nodeManagers.add(nodeManager);
+ executors.add(scheduledExecutorService);
+ executors.add(executor);
+ }
+ for (int i = 0, nodeRunnersSize = nodeRunners.size(); i < nodeRunnersSize; i++) {
+ NodeRunner nodeRunner = nodeRunners.get(i);
+ threads[i] = new Thread(nodeRunner);
+ threads[i].start();
+ }
+ boolean isDebug = isDebug();
+ for (Thread thread : threads) {
+ try {
+ if (isDebug) {
+ thread.join();
+ } else {
+ thread.join(60_000);
+ }
+ } catch (InterruptedException e) {
+ //
+ }
+ if (thread.isAlive()) {
+ thread.interrupt();
+ fail("thread still running");
+ }
+ }
+ // forcibly stop node managers
+ nodeManagers.forEach(nodeManager -> {
+ try {
+ nodeManager.stop();
+ } catch (Exception e) {
+ // won't prevent the test to complete
+ e.printStackTrace();
+ }
+ });
+
+ // stop executors
+ executors.forEach(ExecutorService::shutdownNow);
+
+ for (NodeRunner nodeRunner : nodeRunners) {
+ if (nodeRunner.e != null) {
+ nodeRunner.e.printStackTrace();
+ fail(nodeRunner.e.getMessage());
+ }
+ }
+ Assert.assertFalse("Some of the lease locks has failed to renew the locks", failedRenew.get());
+ }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java
index 7d56321..6685aa5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster;
-import java.io.File;
+import java.util.Arrays;
import org.apache.activemq.artemis.core.server.NodeManager;
-import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
+import org.apache.activemq.artemis.utils.UUID;
public class NodeManagerAction {
@@ -35,6 +35,7 @@ public class NodeManagerAction {
public static final int HAS_BACKUP = 11;
public static final int DOESNT_HAVE_LIVE = 12;
public static final int DOESNT_HAVE_BACKUP = 13;
+ public static final int CHECK_ID = 14;
private final int[] work;
@@ -82,7 +83,6 @@ public class NodeManagerAction {
}
break;
case HAS_BACKUP:
-
if (!hasBackupLock) {
throw new IllegalStateException("backup lock not held");
}
@@ -93,37 +93,50 @@ public class NodeManagerAction {
}
break;
case DOESNT_HAVE_BACKUP:
-
if (hasBackupLock) {
throw new IllegalStateException("backup lock held");
}
break;
+ case CHECK_ID:
+ nodeManager.start();
+ UUID id1 = nodeManager.getUUID();
+ nodeManager.stop();
+ nodeManager.start();
+ if (!Arrays.equals(id1.asBytes(), nodeManager.getUUID().asBytes())) {
+ throw new IllegalStateException("getUUID should be the same on restart");
+ }
+ break;
}
}
}
- public String[] getWork() {
- String[] strings = new String[work.length];
- for (int i = 0, stringsLength = strings.length; i < stringsLength; i++) {
- strings[i] = "" + work[i];
+ public int works() {
+ return work.length;
+ }
+
+ public int getWork(String[] works, int start) {
+ final int workLength = work.length;
+ for (int i = 0; i < workLength; i++) {
+ works[i + start] = Integer.toString(work[i]);
}
- return strings;
+ return workLength;
}
- public static void main(String[] args) throws Exception {
+ public static void execute(String[] args, NodeManager nodeManager) throws Exception {
int[] work1 = new int[args.length];
for (int i = 0; i < args.length; i++) {
work1[i] = Integer.parseInt(args[i]);
}
NodeManagerAction nodeManagerAction = new NodeManagerAction(work1);
- FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false);
nodeManager.start();
try {
nodeManagerAction.performWork(nodeManager);
} catch (Exception e) {
e.printStackTrace();
System.exit(9);
+ } finally {
+ nodeManager.stop();
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java
index 4c6f6ca..c484bf1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java
@@ -24,7 +24,9 @@ import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
import org.junit.Test;
+import static java.lang.management.ManagementFactory.getRuntimeMXBean;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.AWAIT_LIVE;
+import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CHECK_ID;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CRASH_LIVE;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_BACKUP;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_LIVE;
@@ -39,6 +41,12 @@ import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerA
public class NodeManagerTest extends SpawnedTestBase {
@Test
+ public void testID() throws Exception {
+ NodeManagerAction live1 = new NodeManagerAction(CHECK_ID);
+ performWork(live1);
+ }
+
+ @Test
public void testLive() throws Exception {
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
performWork(live1);
@@ -155,6 +163,10 @@ public class NodeManagerTest extends SpawnedTestBase {
}
}
+ protected static boolean isDebug() {
+ return getRuntimeMXBean().getInputArguments().toString().contains("jdwp");
+ }
+
static class NodeRunner implements Runnable {
private NodeManagerAction action;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java
index 3019381..fc1b3b9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java
@@ -147,10 +147,7 @@ public class FileLockNodeManagerTest extends FailoverTestBase {
executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
- return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
- code.printStackTrace();
- Assert.fail(message);
- });
+ return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
case File:
final Configuration config = createDefaultInVMConfig();
if (useSeparateLockFolder) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
index d32e46a..1438d46 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java
@@ -116,10 +116,7 @@ public class NettyFailoverTest extends FailoverTest {
executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
- return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
- code.printStackTrace();
- Assert.fail(message);
- });
+ return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
default:
throw new AssertionError("enum type not supported!");
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
index a76922e..979954c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
@@ -100,7 +100,7 @@ public class CriticalCrashTest extends SpawnedTestBase {
@Override
protected StorageManager createStorageManager() {
- JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
+ JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override
public void readLock() {
super.readLock();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java
index 28d8db6..2c0a517 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java
@@ -91,7 +91,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
@Override
protected StorageManager createStorageManager() {
- JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
+ JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override
protected Journal createMessageJournal(Configuration config,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
index 3182bdb..8504309 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
@@ -175,38 +175,38 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
protected final class FakeNodeManager extends NodeManager {
public FakeNodeManager(String nodeID) {
- super(false, null);
+ super(false);
this.setNodeID(nodeID);
}
@Override
- public void awaitLiveNode() throws Exception {
+ public void awaitLiveNode() {
}
@Override
- public void awaitLiveStatus() throws Exception {
+ public void awaitLiveStatus() {
}
@Override
- public void startBackup() throws Exception {
+ public void startBackup() {
}
@Override
- public ActivateCallback startLiveNode() throws Exception {
+ public ActivateCallback startLiveNode() {
return new CleaningActivateCallback() {
};
}
@Override
- public void pauseLiveServer() throws Exception {
+ public void pauseLiveServer() {
}
@Override
- public void crashLiveServer() throws Exception {
+ public void crashLiveServer() {
}
@Override
- public void releaseBackup() throws Exception {
+ public void releaseBackup() {
}
@Override
@@ -215,12 +215,12 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
}
@Override
- public boolean isAwaitingFailback() throws Exception {
+ public boolean isAwaitingFailback() {
return false;
}
@Override
- public boolean isBackupLive() throws Exception {
+ public boolean isBackupLive() {
return false;
}