You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/28 15:54:46 UTC
[08/16] activemq-artemis git commit: ARTEMIS-1447 JDBC NodeManager to
support JDBC HA Shared Store
ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store
(cherry picked from commit 7944a25269d939791bfbc2637e3c649a9137ad45)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/565b8175
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/565b8175
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/565b8175
Branch: refs/heads/1.x
Commit: 565b817592573b508a08197a6da1c2649a678be8
Parents: b267e8a
Author: Francesco Nigro <ni...@gmail.com>
Authored: Sat Sep 9 18:41:30 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 28 11:54:15 2018 -0400
----------------------------------------------------------------------
.../config/ActiveMQDefaultConfiguration.java | 25 ++
.../jdbc/store/sql/GenericSQLProvider.java | 182 ++++++++-
.../artemis/jdbc/store/sql/SQLProvider.java | 39 +-
artemis-server/pom.xml | 7 +-
.../storage/DatabaseStorageConfiguration.java | 40 ++
.../deployers/impl/FileConfigurationParser.java | 3 +
.../artemis/core/server/NodeManager.java | 2 +-
.../core/server/impl/ActiveMQServerImpl.java | 7 +
.../impl/jdbc/ActiveMQScheduledLeaseLock.java | 115 ++++++
.../core/server/impl/jdbc/JdbcLeaseLock.java | 277 ++++++++++++++
.../core/server/impl/jdbc/JdbcNodeManager.java | 380 +++++++++++++++++++
.../impl/jdbc/JdbcSharedStateManager.java | 302 +++++++++++++++
.../core/server/impl/jdbc/LeaseLock.java | 151 ++++++++
.../server/impl/jdbc/ScheduledLeaseLock.java | 44 +++
.../server/impl/jdbc/SharedStateManager.java | 60 +++
.../resources/schema/artemis-configuration.xsd | 21 +
.../server/impl/jdbc/JdbcLeaseLockTest.java | 231 +++++++++++
docs/user-manual/en/persistence.md | 15 +
18 files changed, 1890 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 6188dbb..c9f775e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -437,9 +437,18 @@ public final class ActiveMQDefaultConfiguration {
// Default Page Store table name, used with Database storage type
private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
+ // Default node manager store table name, used with Database storage type
+ private static final String DEFAULT_NODE_MANAGER_STORE_TABLE_NAME = "NODE_MANAGER_STORE";
+
private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
+ private static final long DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS = TimeUnit.SECONDS.toMillis(4);
+
+ private static final long DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(20);
+
+ private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
+
// Default JMS Bingings table name, used with Database storage type
private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS";
@@ -1197,10 +1206,26 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PAGE_STORE_TABLE_NAME;
}
+ public static String getDefaultNodeManagerStoreTableName() {
+ return DEFAULT_NODE_MANAGER_STORE_TABLE_NAME;
+ }
+
public static int getDefaultJdbcNetworkTimeout() {
return DEFAULT_JDBC_NETWORK_TIMEOUT;
}
+ public static long getDefaultJdbcLockRenewPeriodMillis() {
+ return DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS;
+ }
+
+ public static long getDefaultJdbcLockExpirationMillis() {
+ return DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS;
+ }
+
+ public static long getDefaultJdbcLockAcquisitionTimeoutMillis() {
+ return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
+ }
+
public static String getDefaultJMSBindingsTableName() {
return DEFAULT_JMS_BINDINGS_TABLE_NAME;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
index 9232001..ac793d3 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
@@ -18,6 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.sql;
public class GenericSQLProvider implements SQLProvider {
+ /**
+ * The JDBC Node Manager shared state is contained in these 4 rows: each one is used exclusively for a specific purpose.
+ */
+ private static final int STATE_ROW_ID = 0;
+ private static final int LIVE_LOCK_ROW_ID = 1;
+ private static final int BACKUP_LOCK_ROW_ID = 2;
+ private static final int NODE_ID_ROW_ID = 3;
+
// Default to lowest (MYSQL = 64k)
private static final long MAX_BLOB_SIZE = 64512;
@@ -57,6 +65,42 @@ public class GenericSQLProvider implements SQLProvider {
private final String countJournalRecordsSQL;
+ private final String createNodeManagerStoreTableSQL;
+
+ private final String createStateSQL;
+
+ private final String createNodeIdSQL;
+
+ private final String createLiveLockSQL;
+
+ private final String createBackupLockSQL;
+
+ private final String tryAcquireLiveLockSQL;
+
+ private final String tryAcquireBackupLockSQL;
+
+ private final String tryReleaseLiveLockSQL;
+
+ private final String tryReleaseBackupLockSQL;
+
+ private final String isLiveLockedSQL;
+
+ private final String isBackupLockedSQL;
+
+ private final String renewLiveLockSQL;
+
+ private final String renewBackupLockSQL;
+
+ private final String currentTimestampSQL;
+
+ private final String writeStateSQL;
+
+ private final String readStateSQL;
+
+ private final String writeNodeIdSQL;
+
+ private final String readNodeIdSQL;
+
protected final DatabaseStoreType databaseStoreType;
protected GenericSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
@@ -64,8 +108,7 @@ public class GenericSQLProvider implements SQLProvider {
this.databaseStoreType = databaseStoreType;
- createFileTableSQL = "CREATE TABLE " + tableName +
- "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
+ createFileTableSQL = "CREATE TABLE " + tableName + "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
@@ -81,17 +124,13 @@ public class GenericSQLProvider implements SQLProvider {
updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
- cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
- "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
+ cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
dropFileTableSQL = "DROP TABLE " + tableName;
- createJournalTableSQL = new String[] {
- "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))",
- "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"
- };
+ createJournalTableSQL = new String[]{"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))", "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"};
insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
@@ -102,6 +141,43 @@ public class GenericSQLProvider implements SQLProvider {
deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
+
+ createNodeManagerStoreTableSQL = "CREATE TABLE " + tableName + " ( ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))";
+
+ createStateSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + STATE_ROW_ID + ")";
+
+ createNodeIdSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + NODE_ID_ROW_ID + ")";
+
+ createLiveLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + LIVE_LOCK_ROW_ID + ")";
+
+ createBackupLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + BACKUP_LOCK_ROW_ID + ")";
+
+ tryAcquireLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + LIVE_LOCK_ROW_ID;
+
+ tryAcquireBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + BACKUP_LOCK_ROW_ID;
+
+ tryReleaseLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
+
+ tryReleaseBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
+
+ isLiveLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + LIVE_LOCK_ROW_ID;
+
+ isBackupLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + BACKUP_LOCK_ROW_ID;
+
+ renewLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
+
+ renewBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
+
+ currentTimestampSQL = "SELECT CURRENT_TIMESTAMP FROM " + tableName;
+
+ writeStateSQL = "UPDATE " + tableName + " SET STATE = ? WHERE ID = " + STATE_ROW_ID;
+
+ readStateSQL = "SELECT STATE FROM " + tableName + " WHERE ID = " + STATE_ROW_ID;
+
+ writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
+
+ readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
+
}
@Override
@@ -202,6 +278,96 @@ public class GenericSQLProvider implements SQLProvider {
}
@Override
+ public String createNodeManagerStoreTableSQL() {
+ return createNodeManagerStoreTableSQL;
+ }
+
+ @Override
+ public String createStateSQL() {
+ return createStateSQL;
+ }
+
+ @Override
+ public String createNodeIdSQL() {
+ return createNodeIdSQL;
+ }
+
+ @Override
+ public String createLiveLockSQL() {
+ return createLiveLockSQL;
+ }
+
+ @Override
+ public String createBackupLockSQL() {
+ return createBackupLockSQL;
+ }
+
+ @Override
+ public String tryAcquireLiveLockSQL() {
+ return tryAcquireLiveLockSQL;
+ }
+
+ @Override
+ public String tryAcquireBackupLockSQL() {
+ return tryAcquireBackupLockSQL;
+ }
+
+ @Override
+ public String tryReleaseLiveLockSQL() {
+ return tryReleaseLiveLockSQL;
+ }
+
+ @Override
+ public String tryReleaseBackupLockSQL() {
+ return tryReleaseBackupLockSQL;
+ }
+
+ @Override
+ public String isLiveLockedSQL() {
+ return isLiveLockedSQL;
+ }
+
+ @Override
+ public String isBackupLockedSQL() {
+ return isBackupLockedSQL;
+ }
+
+ @Override
+ public String renewLiveLockSQL() {
+ return renewLiveLockSQL;
+ }
+
+ @Override
+ public String renewBackupLockSQL() {
+ return renewBackupLockSQL;
+ }
+
+ @Override
+ public String currentTimestampSQL() {
+ return currentTimestampSQL;
+ }
+
+ @Override
+ public String writeStateSQL() {
+ return writeStateSQL;
+ }
+
+ @Override
+ public String readStateSQL() {
+ return readStateSQL;
+ }
+
+ @Override
+ public String writeNodeIdSQL() {
+ return writeNodeIdSQL;
+ }
+
+ @Override
+ public String readNodeIdSQL() {
+ return readNodeIdSQL;
+ }
+
+ @Override
public boolean closeConnectionOnShutdown() {
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
index 1663179..b4b55d5 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.sql;
public interface SQLProvider {
enum DatabaseStoreType {
- PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE
+ PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE, NODE_MANAGER
}
long getMaxBlobSize();
@@ -62,7 +62,44 @@ public interface SQLProvider {
boolean closeConnectionOnShutdown();
+ String createNodeManagerStoreTableSQL();
+
+ String createStateSQL();
+
+ String createNodeIdSQL();
+
+ String createLiveLockSQL();
+
+ String createBackupLockSQL();
+
+ String tryAcquireLiveLockSQL();
+
+ String tryAcquireBackupLockSQL();
+
+ String tryReleaseLiveLockSQL();
+
+ String tryReleaseBackupLockSQL();
+
+ String isLiveLockedSQL();
+
+ String isBackupLockedSQL();
+
+ String renewLiveLockSQL();
+
+ String renewBackupLockSQL();
+
+ String currentTimestampSQL();
+
+ String writeStateSQL();
+
+ String readStateSQL();
+
+ String writeNodeIdSQL();
+
+ String readNodeIdSQL();
+
interface Factory {
+
SQLProvider create(String tableName, DatabaseStoreType dbStoreType);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 767cfaa..1f68af2 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -124,7 +124,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
-
+ <!-- db test -->
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
index 285d1cf..5429f0c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
@@ -34,6 +34,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String jmsBindingsTableName = ActiveMQDefaultConfiguration.getDefaultJMSBindingsTableName();
+ private String nodeManagerStoreTableName = ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName();
+
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@@ -44,6 +46,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
+ private long jdbcLockRenewPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
+
+ private long jdbcLockExpirationMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
+
+ private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
+
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
@@ -77,6 +85,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
return pageStoreTableName;
}
+ public void setNodeManagerStoreTableName(String nodeManagerStoreTableName) {
+ this.nodeManagerStoreTableName = nodeManagerStoreTableName;
+ }
+
+ public String getNodeManagerStoreTableName() {
+ return nodeManagerStoreTableName;
+ }
+
public void setPageStoreTableName(String pageStoreTableName) {
this.pageStoreTableName = pageStoreTableName;
}
@@ -145,4 +161,28 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public void setJdbcNetworkTimeout(int jdbcNetworkTimeout) {
this.jdbcNetworkTimeout = jdbcNetworkTimeout;
}
+
+ public long getJdbcLockRenewPeriodMillis() {
+ return jdbcLockRenewPeriodMillis;
+ }
+
+ public void setJdbcLockRenewPeriodMillis(long jdbcLockRenewPeriodMillis) {
+ this.jdbcLockRenewPeriodMillis = jdbcLockRenewPeriodMillis;
+ }
+
+ public long getJdbcLockExpirationMillis() {
+ return jdbcLockExpirationMillis;
+ }
+
+ public void setJdbcLockExpirationMillis(long jdbcLockExpirationMillis) {
+ this.jdbcLockExpirationMillis = jdbcLockExpirationMillis;
+ }
+
+ public long getJdbcLockAcquisitionTimeoutMillis() {
+ return jdbcLockAcquisitionTimeoutMillis;
+ }
+
+ public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
+ this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index cc30163..e69c486 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1164,6 +1164,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
+ conf.setJdbcLockAcquisitionTimeoutMillis(getLong(storeNode, "jdbc-lock-acquisition-timeout", conf.getJdbcLockAcquisitionTimeoutMillis(), Validators.NO_CHECK));
+ conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
+ conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
return conf;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
index 28f05b2..e963b22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java
@@ -127,7 +127,7 @@ public abstract class NodeManager implements ActiveMQComponent {
isStarted = false;
}
- public final void stopBackup() throws Exception {
+ public void stopBackup() throws Exception {
if (replicatedBackup && getNodeId() != null) {
setUpServerLockFile();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index bb78608..ab47aa7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -138,6 +138,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
+import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
@@ -448,6 +449,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
NodeManager manager;
if (!configuration.isPersistenceEnabled()) {
manager = new InVMNodeManager(replicatingBackup);
+ } else if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
+ if (replicatingBackup) {
+ throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
+ }
+ final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
+ manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO);
} else {
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
new file mode 100644
index 0000000..30db629
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.jboss.logging.Logger;
+
+/**
+ * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}.
+ */
+final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
+
+ private static final Logger LOGGER = Logger.getLogger(ActiveMQScheduledLeaseLock.class);
+
+ private final String lockName;
+ private final LeaseLock lock;
+ private long lastLockRenewStart;
+ private final long renewPeriodMillis;
+ private final IOCriticalErrorListener ioCriticalErrorListener;
+
+ ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
+ ArtemisExecutor executor,
+ String lockName,
+ LeaseLock lock,
+ long renewPeriodMillis,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
+ if (renewPeriodMillis >= lock.expirationMillis()) {
+ throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
+ }
+ this.lockName = lockName;
+ this.lock = lock;
+ this.renewPeriodMillis = renewPeriodMillis;
+ //already expired start time
+ this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
+ this.ioCriticalErrorListener = ioCriticalErrorListener;
+ }
+
+ @Override
+ public long renewPeriodMillis() {
+ return renewPeriodMillis;
+ }
+
+ @Override
+ public LeaseLock lock() {
+ return lock;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (isStarted()) {
+ return;
+ }
+ this.lastLockRenewStart = System.nanoTime();
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (!isStarted()) {
+ return;
+ }
+ super.stop();
+ }
+
+ @Override
+ public void run() {
+ final long lastRenewStart = this.lastLockRenewStart;
+ final long renewStart = System.nanoTime();
+ if (!this.lock.renew()) {
+ ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
+ }
+ //logic to detect slowness of DB and/or the scheduled executor service
+ detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
+ this.lastLockRenewStart = lastRenewStart;
+ }
+
+ private static void detectAndReportRenewSlowness(String lockName,
+ long lastRenewStart,
+ long renewStart,
+ long expectedRenewPeriodMillis,
+ long expirationMillis) {
+ final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
+ if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
+ LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms");
+ }
+ final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
+ final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
+ if (measuredRenewPeriodMillis > expirationMillis) {
+ LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+ } else if (measuredRenewPeriodMillis > expectedRenewPeriodMillis) {
+ LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
new file mode 100644
index 0000000..0656235
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.jboss.logging.Logger;
+
+/**
+ * JDBC implementation of a {@link LeaseLock} with a {@code String} defined {@link #holderId()}.
+ */
+final class JdbcLeaseLock implements LeaseLock {
+
+ private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
+ private static final int MAX_HOLDER_ID_LENGTH = 128;
+ private final Connection connection;
+ private final long maxAllowableMillisDiffFromDBTime;
+ private long millisDiffFromCurrentTime;
+ private final String holderId;
+ private final PreparedStatement tryAcquireLock;
+ private final PreparedStatement tryReleaseLock;
+ private final PreparedStatement renewLock;
+ private final PreparedStatement isLocked;
+ private final PreparedStatement currentDateTime;
+ private final long expirationMillis;
+ private boolean maybeAcquired;
+
+ /**
+ * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
+ * whose life cycle will be managed externally.
+ */
+ JdbcLeaseLock(String holderId,
+ Connection connection,
+ PreparedStatement tryAcquireLock,
+ PreparedStatement tryReleaseLock,
+ PreparedStatement renewLock,
+ PreparedStatement isLocked,
+ PreparedStatement currentDateTime,
+ long expirationMIllis,
+ long maxAllowableMillisDiffFromDBTime) {
+ if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
+ throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
+ }
+ this.holderId = holderId;
+ this.maxAllowableMillisDiffFromDBTime = maxAllowableMillisDiffFromDBTime;
+ this.millisDiffFromCurrentTime = Long.MAX_VALUE;
+ this.tryAcquireLock = tryAcquireLock;
+ this.tryReleaseLock = tryReleaseLock;
+ this.renewLock = renewLock;
+ this.isLocked = isLocked;
+ this.currentDateTime = currentDateTime;
+ this.expirationMillis = expirationMIllis;
+ this.maybeAcquired = false;
+ this.connection = connection;
+ }
+
+ public String holderId() {
+ return holderId;
+ }
+
+ @Override
+ public long expirationMillis() {
+ return expirationMillis;
+ }
+
+ private long timeDifference() throws SQLException {
+ if (Long.MAX_VALUE == millisDiffFromCurrentTime) {
+ if (maxAllowableMillisDiffFromDBTime > 0) {
+ millisDiffFromCurrentTime = determineTimeDifference();
+ } else {
+ millisDiffFromCurrentTime = 0L;
+ }
+ }
+ return millisDiffFromCurrentTime;
+ }
+
+ private long determineTimeDifference() throws SQLException {
+ try (ResultSet resultSet = currentDateTime.executeQuery()) {
+ long result = 0L;
+ if (resultSet.next()) {
+ final Timestamp timestamp = resultSet.getTimestamp(1);
+ final long diff = System.currentTimeMillis() - timestamp.getTime();
+ if (Math.abs(diff) > maxAllowableMillisDiffFromDBTime) {
+ // off by more than maxAllowableMillisDiffFromDBTime so lets adjust
+ result = (-diff);
+ }
+ LOGGER.info(holderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
+ }
+ return result;
+ }
+ }
+
+ @Override
+ public boolean renew() {
+ synchronized (connection) {
+ try {
+ final boolean result;
+ connection.setAutoCommit(false);
+ try {
+ final long timeDifference = timeDifference();
+ final PreparedStatement preparedStatement = this.renewLock;
+ final long now = System.currentTimeMillis() + timeDifference;
+ final Timestamp timestamp = new Timestamp(now + expirationMillis);
+ preparedStatement.setTimestamp(1, timestamp);
+ preparedStatement.setString(2, holderId);
+ result = preparedStatement.executeUpdate() == 1;
+ } catch (SQLException ie) {
+ connection.rollback();
+ connection.setAutoCommit(true);
+ throw new IllegalStateException(ie);
+ }
+ connection.commit();
+ connection.setAutoCommit(true);
+ return result;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean tryAcquire() {
+ synchronized (connection) {
+ try {
+ final boolean acquired;
+ connection.setAutoCommit(false);
+ try {
+ final long timeDifference = timeDifference();
+ final PreparedStatement preparedStatement = tryAcquireLock;
+ final long now = System.currentTimeMillis() + timeDifference;
+ preparedStatement.setString(1, holderId);
+ final Timestamp timestamp = new Timestamp(now + expirationMillis);
+ preparedStatement.setTimestamp(2, timestamp);
+ acquired = preparedStatement.executeUpdate() == 1;
+ } catch (SQLException ie) {
+ connection.rollback();
+ connection.setAutoCommit(true);
+ throw new IllegalStateException(ie);
+ }
+ connection.commit();
+ connection.setAutoCommit(true);
+ if (acquired) {
+ this.maybeAcquired = true;
+ }
+ return acquired;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean isHeld() {
+ return checkValidHolderId(Objects::nonNull);
+ }
+
+ @Override
+ public boolean isHeldByCaller() {
+ return checkValidHolderId(this.holderId::equals);
+ }
+
+ private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
+ synchronized (connection) {
+ try {
+ boolean result;
+ connection.setAutoCommit(false);
+ try {
+ final long timeDifference = timeDifference();
+ final PreparedStatement preparedStatement = this.isLocked;
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ result = false;
+ } else {
+ final String currentHolderId = resultSet.getString(1);
+ result = holderIdFilter.test(currentHolderId);
+ //warn about any zombie lock
+ final Timestamp timestamp = resultSet.getTimestamp(2);
+ if (timestamp != null) {
+ final long lockExpirationTime = timestamp.getTime();
+ final long now = System.currentTimeMillis() + timeDifference;
+ final long expiredBy = now - lockExpirationTime;
+ if (expiredBy > 0) {
+ result = false;
+ LOGGER.warn("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
+ }
+ }
+ }
+ }
+ } catch (SQLException ie) {
+ connection.rollback();
+ connection.setAutoCommit(true);
+ throw new IllegalStateException(ie);
+ }
+ connection.commit();
+ connection.setAutoCommit(true);
+ return result;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void release() {
+ synchronized (connection) {
+ try {
+ connection.setAutoCommit(false);
+ try {
+ final PreparedStatement preparedStatement = this.tryReleaseLock;
+ preparedStatement.setString(1, holderId);
+ if (preparedStatement.executeUpdate() != 1) {
+ LOGGER.warn(holderId + " has failed to release a lock");
+ } else {
+ LOGGER.info(holderId + " has released a lock");
+ }
+ //consider it as released to avoid on finalize to be reclaimed
+ this.maybeAcquired = false;
+ } catch (SQLException ie) {
+ connection.rollback();
+ connection.setAutoCommit(true);
+ throw new IllegalStateException(ie);
+ }
+ connection.commit();
+ connection.setAutoCommit(true);
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws SQLException {
+ synchronized (connection) {
+ //to avoid being called if not needed
+ if (!this.tryReleaseLock.isClosed()) {
+ try {
+ if (this.maybeAcquired) {
+ release();
+ }
+ } finally {
+ this.tryReleaseLock.close();
+ this.tryAcquireLock.close();
+ this.renewLock.close();
+ this.isLocked.close();
+ this.currentDateTime.close();
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
new file mode 100644
index 0000000..f4baeea
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import javax.sql.DataSource;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.server.ActivateCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
+
+/**
+ * JDBC implementation of {@link NodeManager}.
+ */
+public final class JdbcNodeManager extends NodeManager {
+
+ private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
+ private static final long MAX_PAUSE_MILLIS = 2000L;
+
+ private final SharedStateManager sharedStateManager;
+ private final ScheduledLeaseLock scheduledLiveLock;
+ private final ScheduledLeaseLock scheduledBackupLock;
+ private final long lockRenewPeriodMillis;
+ private final long lockAcquisitionTimeoutMillis;
+ private volatile boolean interrupted = false;
+ private final LeaseLock.Pauser pauser;
+ private final IOCriticalErrorListener ioCriticalErrorListener;
+
+ public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
+ ScheduledExecutorService scheduledExecutorService,
+ ExecutorFactory executorFactory,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ if (configuration.getDataSource() != null) {
+ final String brokerId = java.util.UUID.randomUUID().toString();
+ return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+ } else {
+ final String brokerId = java.util.UUID.randomUUID().toString();
+ return usingConnectionUrl(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+ }
+ }
+
+ static JdbcNodeManager usingDataSource(String brokerId,
+ long lockExpirationMillis,
+ long lockRenewPeriodMillis,
+ long lockAcquisitionTimeoutMillis,
+ DataSource dataSource,
+ SQLProvider provider,
+ ScheduledExecutorService scheduledExecutorService,
+ ExecutorFactory executorFactory,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+ }
+
+ public static JdbcNodeManager usingConnectionUrl(String brokerId,
+ long lockExpirationMillis,
+ long lockRenewPeriodMillis,
+ long lockAcquisitionTimeoutMillis,
+ String jdbcUrl,
+ String driverClass,
+ SQLProvider provider,
+ ScheduledExecutorService scheduledExecutorService,
+ ExecutorFactory executorFactory,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener);
+ }
+
+ private JdbcNodeManager(final SharedStateManager sharedStateManager,
+ boolean replicatedBackup,
+ long lockRenewPeriodMillis,
+ long lockAcquisitionTimeoutMillis,
+ ScheduledExecutorService scheduledExecutorService,
+ ExecutorFactory executorFactory,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ super(replicatedBackup, null);
+ this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
+ this.lockRenewPeriodMillis = lockRenewPeriodMillis;
+ this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
+ this.sharedStateManager = sharedStateManager;
+ this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
+ this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener);
+ this.ioCriticalErrorListener = ioCriticalErrorListener;
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ if (isStarted()) {
+ return;
+ }
+ if (!replicatedBackup) {
+ final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
+ setUUID(nodeId);
+ }
+
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() throws Exception {
+ if (isStarted()) {
+ try {
+ this.scheduledLiveLock.stop();
+ this.scheduledBackupLock.stop();
+ } finally {
+ super.stop();
+ this.sharedStateManager.close();
+ }
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ stop();
+ }
+
+ @Override
+ public boolean isAwaitingFailback() throws Exception {
+ return readSharedState() == SharedStateManager.State.FAILING_BACK;
+ }
+
+ @Override
+ public boolean isBackupLive() throws Exception {
+ //is anyone holding the live lock?
+ return this.scheduledLiveLock.lock().isHeld();
+ }
+
+ @Override
+ public void stopBackup() throws Exception {
+ if (replicatedBackup) {
+ final UUID nodeId = getUUID();
+ sharedStateManager.writeNodeId(nodeId);
+ }
+ releaseBackup();
+ }
+
+ @Override
+ public void interrupt() {
+ //need to be volatile: must be called concurrently to work as expected
+ interrupted = true;
+ }
+
+ @Override
+ public void releaseBackup() throws Exception {
+ if (this.scheduledBackupLock.lock().isHeldByCaller()) {
+ this.scheduledBackupLock.stop();
+ this.scheduledBackupLock.lock().release();
+ }
+ }
+
+ /**
+ * Try to acquire a lock, failing with an exception otherwise.
+ */
+ private void lock(LeaseLock lock) throws Exception {
+ final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted);
+ switch (acquireResult) {
+ case Timeout:
+ throw new Exception("timed out waiting for lock");
+ case Exit:
+ this.interrupted = false;
+ throw new InterruptedException("LeaseLock was interrupted");
+ case Done:
+ break;
+ default:
+ throw new AssertionError(acquireResult + " not managed");
+ }
+
+ }
+
+ private void checkInterrupted(Supplier<String> message) throws InterruptedException {
+ if (this.interrupted) {
+ interrupted = false;
+ throw new InterruptedException(message.get());
+ }
+ }
+
+ private void renewLiveLockIfNeeded(final long acquiredOn) {
+ final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn);
+ if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
+ if (!this.scheduledLiveLock.lock().renew()) {
+ final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
+ try {
+ ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
+ } finally {
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
+ */
+ private boolean lockLiveAndCheckLiveState() throws Exception {
+ lock(this.scheduledLiveLock.lock());
+ final long acquiredOn = System.nanoTime();
+ boolean liveWhileLocked = false;
+ //check if the state is live
+ final SharedStateManager.State stateWhileLocked;
+ try {
+ stateWhileLocked = readSharedState();
+ } catch (Throwable t) {
+ logger.error("error while holding the live node lock and tried to read the shared state", t);
+ this.scheduledLiveLock.lock().release();
+ throw t;
+ }
+ if (stateWhileLocked == SharedStateManager.State.LIVE) {
+ renewLiveLockIfNeeded(acquiredOn);
+ liveWhileLocked = true;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("state is " + stateWhileLocked + " while holding the live lock");
+ }
+ //state is not live: can (try to) release the lock
+ this.scheduledLiveLock.lock().release();
+ }
+ return liveWhileLocked;
+ }
+
+ @Override
+ public void awaitLiveNode() throws Exception {
+ boolean liveWhileLocked = false;
+ while (!liveWhileLocked) {
+ //check first without holding any lock
+ final SharedStateManager.State state = readSharedState();
+ if (state == SharedStateManager.State.LIVE) {
+ //verify if the state is live while holding the live node lock too
+ liveWhileLocked = lockLiveAndCheckLiveState();
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("awaiting live node...state: " + state);
+ }
+ }
+ if (!liveWhileLocked) {
+ checkInterrupted(() -> "awaitLiveNode got interrupted!");
+ pauser.idle();
+ }
+ }
+ //state is LIVE and live lock is acquired and valid
+ logger.debug("acquired live node lock");
+ this.scheduledLiveLock.start();
+ }
+
+ @Override
+ public void startBackup() throws Exception {
+ assert !replicatedBackup; // should not be called if this is a replicating backup
+ ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
+
+ lock(scheduledBackupLock.lock());
+ scheduledBackupLock.start();
+ ActiveMQServerLogger.LOGGER.gotBackupLock();
+ if (getUUID() == null)
+ readNodeId();
+ }
+
+ @Override
+ public ActivateCallback startLiveNode() throws Exception {
+ setFailingBack();
+
+ final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
+
+ ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
+
+ lock(this.scheduledLiveLock.lock());
+
+ this.scheduledLiveLock.start();
+
+ ActiveMQServerLogger.LOGGER.obtainedLiveLock();
+
+ return new ActivateCallback() {
+ @Override
+ public void preActivate() {
+ }
+
+ @Override
+ public void activated() {
+ }
+
+ @Override
+ public void deActivate() {
+ }
+
+ @Override
+ public void activationComplete() {
+ try {
+ //state can be written only if the live renew task is running
+ setLive();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void pauseLiveServer() throws Exception {
+ if (scheduledLiveLock.isStarted()) {
+ setPaused();
+ scheduledLiveLock.stop();
+ scheduledLiveLock.lock().release();
+ } else if (scheduledLiveLock.lock().renew()) {
+ setPaused();
+ scheduledLiveLock.lock().release();
+ } else {
+ final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
+ try {
+ ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
+ } finally {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void crashLiveServer() throws Exception {
+ if (this.scheduledLiveLock.lock().isHeldByCaller()) {
+ scheduledLiveLock.stop();
+ this.scheduledLiveLock.lock().release();
+ }
+ }
+
+ @Override
+ public void awaitLiveStatus() {
+ while (readSharedState() != SharedStateManager.State.LIVE) {
+ pauser.idle();
+ }
+ }
+
+ private void setLive() {
+ writeSharedState(SharedStateManager.State.LIVE);
+ }
+
+ private void setFailingBack() {
+ writeSharedState(SharedStateManager.State.FAILING_BACK);
+ }
+
+ private void setPaused() {
+ writeSharedState(SharedStateManager.State.PAUSED);
+ }
+
+ private void writeSharedState(SharedStateManager.State state) {
+ assert !this.replicatedBackup : "the replicated backup can't write the shared state!";
+ this.sharedStateManager.writeState(state);
+ }
+
+ private SharedStateManager.State readSharedState() {
+ return this.sharedStateManager.readState();
+ }
+
+ @Override
+ public SimpleString readNodeId() {
+ final UUID nodeId = this.sharedStateManager.readNodeId();
+ setUUID(nodeId);
+ return getNodeId();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
new file mode 100644
index 0000000..dad1abc
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.UUID;
+
+/**
+ * JDBC implementation of a {@link SharedStateManager}.
+ */
+@SuppressWarnings("SynchronizeOnNonFinalField")
+final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
+
+ private final String holderId;
+ private final long lockExpirationMillis;
+ private JdbcLeaseLock liveLock;
+ private JdbcLeaseLock backupLock;
+ private PreparedStatement readNodeId;
+ private PreparedStatement writeNodeId;
+ private PreparedStatement readState;
+ private PreparedStatement writeState;
+
+ public static JdbcSharedStateManager usingDataSource(String holderId,
+ long locksExpirationMillis,
+ DataSource dataSource,
+ SQLProvider provider) {
+ final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
+ sharedStateManager.setDataSource(dataSource);
+ sharedStateManager.setSqlProvider(provider);
+ try {
+ sharedStateManager.start();
+ return sharedStateManager;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static JdbcSharedStateManager usingConnectionUrl(String holderId,
+ long locksExpirationMillis,
+ String jdbcConnectionUrl,
+ String jdbcDriverClass,
+ SQLProvider provider) {
+ final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
+ sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
+ sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
+ sharedStateManager.setSqlProvider(provider);
+ try {
+ sharedStateManager.start();
+ return sharedStateManager;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ protected void createSchema() throws SQLException {
+ try {
+ createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
+ } catch (SQLException e) {
+ //no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
+ }
+ }
+
+ static JdbcLeaseLock createLiveLock(String holderId,
+ Connection connection,
+ SQLProvider sqlProvider,
+ long expirationMillis,
+ long maxAllowableMillisDiffFromDBtime) throws SQLException {
+ return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
+ }
+
+ static JdbcLeaseLock createBackupLock(String holderId,
+ Connection connection,
+ SQLProvider sqlProvider,
+ long expirationMillis,
+ long maxAllowableMillisDiffFromDBtime) throws SQLException {
+ return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime);
+ }
+
+ @Override
+ protected void prepareStatements() throws SQLException {
+ this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
+ this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0);
+ this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
+ this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
+ this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
+ this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
+ }
+
+ private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
+ this.holderId = holderId;
+ this.lockExpirationMillis = lockExpirationMillis;
+ }
+
+ @Override
+ public LeaseLock liveLock() {
+ return this.liveLock;
+ }
+
+ @Override
+ public LeaseLock backupLock() {
+ return this.backupLock;
+ }
+
+ private UUID rawReadNodeId() throws SQLException {
+ final PreparedStatement preparedStatement = this.readNodeId;
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ return null;
+ } else {
+ final String nodeId = resultSet.getString(1);
+ if (nodeId != null) {
+ return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
+ } else {
+ return null;
+ }
+ }
+ }
+ }
+
+ @Override
+ public UUID readNodeId() {
+ synchronized (connection) {
+ try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ connection.setAutoCommit(true);
+ final UUID nodeId = rawReadNodeId();
+ return nodeId;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void writeNodeId(UUID nodeId) {
+ synchronized (connection) {
+ try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ connection.setAutoCommit(true);
+ rawWriteNodeId(nodeId);
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ private void rawWriteNodeId(UUID nodeId) throws SQLException {
+ final PreparedStatement preparedStatement = this.writeNodeId;
+ preparedStatement.setString(1, nodeId.toString());
+ if (preparedStatement.executeUpdate() != 1) {
+ throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!");
+ }
+ }
+
+ @Override
+ public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
+ //uses a single transaction to make everything
+ synchronized (connection) {
+ try {
+ final UUID nodeId;
+ connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ connection.setAutoCommit(false);
+ try {
+ UUID readNodeId = rawReadNodeId();
+ if (readNodeId == null) {
+ nodeId = nodeIdFactory.get();
+ rawWriteNodeId(nodeId);
+ } else {
+ nodeId = readNodeId;
+ }
+ } catch (SQLException e) {
+ connection.rollback();
+ connection.setAutoCommit(true);
+ throw e;
+ }
+ connection.commit();
+ connection.setAutoCommit(true);
+ return nodeId;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ private static State decodeState(String s) {
+ if (s == null) {
+ return State.NOT_STARTED;
+ }
+ switch (s) {
+ case "L":
+ return State.LIVE;
+ case "F":
+ return State.FAILING_BACK;
+ case "P":
+ return State.PAUSED;
+ case "N":
+ return State.NOT_STARTED;
+ default:
+ throw new IllegalStateException("unknown state [" + s + "]");
+ }
+ }
+
+ private static String encodeState(State state) {
+ switch (state) {
+ case LIVE:
+ return "L";
+ case FAILING_BACK:
+ return "F";
+ case PAUSED:
+ return "P";
+ case NOT_STARTED:
+ return "N";
+ default:
+ throw new IllegalStateException("unknown state [" + state + "]");
+ }
+ }
+
+ @Override
+ public State readState() {
+ synchronized (connection) {
+ try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ connection.setAutoCommit(true);
+ final State state;
+ final PreparedStatement preparedStatement = this.readState;
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ state = State.FIRST_TIME_START;
+ } else {
+ state = decodeState(resultSet.getString(1));
+ }
+ }
+ return state;
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void writeState(State state) {
+ final String encodedState = encodeState(state);
+ synchronized (connection) {
+ try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ connection.setAutoCommit(true);
+ final PreparedStatement preparedStatement = this.writeState;
+ preparedStatement.setString(1, encodedState);
+ if (preparedStatement.executeUpdate() != 1) {
+ throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!");
+ }
+ } catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws SQLException {
+ //release all the managed resources inside the connection lock
+ if (sqlProvider.closeConnectionOnShutdown()) {
+ synchronized (connection) {
+ this.readNodeId.close();
+ this.writeNodeId.close();
+ this.readState.close();
+ this.writeState.close();
+ this.liveLock.close();
+ this.backupLock.close();
+ super.stop();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws SQLException {
+ stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
new file mode 100644
index 0000000..8deda12
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * It represents a lock that can't be held more than {@link #expirationMillis()} without being renewed.
+ *
+ * <p>
+ * An implementor must provide implicitly the caller identity to contextualize each operation (eg {@link JdbcLeaseLock}
+ * uses one caller per instance)
+ */
+interface LeaseLock extends AutoCloseable {
+
+ enum AcquireResult {
+ Timeout, Exit, Done
+ }
+
+ interface ExitCondition {
+
+ /**
+ * @return true as long as we should keep running
+ */
+ boolean keepRunning();
+ }
+
+ interface Pauser {
+
+ void idle();
+
+ static Pauser sleep(long idleTime, TimeUnit timeUnit) {
+ final long idleNanos = timeUnit.toNanos(idleTime);
+ //can fail spuriously but doesn't throw any InterruptedException
+ return () -> LockSupport.parkNanos(idleNanos);
+ }
+
+ static Pauser noWait() {
+ return () -> {
+ };
+ }
+ }
+
+ /**
+ * The expiration in milliseconds from the last valid acquisition/renew.
+ */
+ default long expirationMillis() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * It extends the lock expiration (if held) to {@link System#currentTimeMillis()} + {@link #expirationMillis()}.
+ *
+ * @return {@code true} if the expiration has been moved on, {@code false} otherwise
+ */
+ default boolean renew() {
+ return true;
+ }
+
+ /**
+ * Not reentrant lock acquisition operation.
+ * The lock can be acquired if is not held by anyone (including the caller) or has an expired ownership.
+ *
+ * @return {@code true} if has been acquired, {@code false} otherwise
+ */
+ boolean tryAcquire();
+
+ /**
+ * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
+ * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done})or got interrupted (ie {@link AcquireResult#Exit}).
+ * After each failed attempt is performed a {@link Pauser#idle} call.
+ */
+ default AcquireResult tryAcquire(ExitCondition exitCondition, Pauser pauser) {
+ while (exitCondition.keepRunning()) {
+ if (tryAcquire()) {
+ return AcquireResult.Done;
+ } else {
+ pauser.idle();
+ }
+ }
+ return AcquireResult.Exit;
+ }
+
+ /**
+ * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
+ * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done}), got interrupted (ie {@link AcquireResult#Exit})
+ * or exceed {@code tryAcquireTimeoutMillis}.
+ * After each failed attempt is performed a {@link Pauser#idle} call.
+ * If the specified timeout is <=0 then it behaves as {@link #tryAcquire(ExitCondition, Pauser)}.
+ */
+ default AcquireResult tryAcquire(long tryAcquireTimeoutMillis, Pauser pauser, ExitCondition exitCondition) {
+ if (tryAcquireTimeoutMillis < 0) {
+ return tryAcquire(exitCondition, pauser);
+ }
+ final long timeoutInNanosecond = TimeUnit.MILLISECONDS.toNanos(tryAcquireTimeoutMillis);
+ final long startAcquire = System.nanoTime();
+ while (exitCondition.keepRunning()) {
+ if (tryAcquire()) {
+ return AcquireResult.Done;
+ } else if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
+ return AcquireResult.Timeout;
+ } else {
+ pauser.idle();
+ //check before doing anything if time is expired
+ if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
+ return AcquireResult.Timeout;
+ }
+ }
+ }
+ return AcquireResult.Exit;
+ }
+
+ /**
+ * @return {@code true} if there is a valid (ie not expired) owner, {@code false} otherwise
+ */
+ boolean isHeld();
+
+ /**
+ * @return {@code true} if the caller is a valid (ie not expired) owner, {@code false} otherwise
+ */
+ boolean isHeldByCaller();
+
+ /**
+ * It release the lock itself and all the resources used by it (eg connections, file handlers)
+ */
+ @Override
+ default void close() throws Exception {
+ release();
+ }
+
+ /**
+ * Perform the release if this lock is held by the caller.
+ */
+ void release();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
new file mode 100644
index 0000000..43751f8
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/**
+ * {@link LeaseLock} holder that allows to schedule a {@link LeaseLock#renew} task with a fixed {@link #renewPeriodMillis()} delay.
+ */
+interface ScheduledLeaseLock extends ActiveMQComponent {
+
+ LeaseLock lock();
+
+ long renewPeriodMillis();
+
+ static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
+ ArtemisExecutor executor,
+ String lockName,
+ LeaseLock lock,
+ long renewPeriodMillis,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
new file mode 100644
index 0000000..e26879c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.utils.UUID;
+
+/**
+ * Facade to abstract the operations on the shared state (inter-process and/or inter-thread) necessary to coordinate broker nodes.
+ */
+interface SharedStateManager extends AutoCloseable {
+
+ enum State {
+ LIVE, PAUSED, FAILING_BACK, NOT_STARTED, FIRST_TIME_START
+ }
+
+ LeaseLock liveLock();
+
+ LeaseLock backupLock();
+
+ UUID readNodeId();
+
+ void writeNodeId(UUID nodeId);
+
+ /**
+ * Purpose of this method is to setup the environment to provide a shared state between live/backup servers.
+ * That means:
+ * - check if a shared state exist and create it/wait for it if not
+ * - check if a nodeId exists and create it if not
+ *
+ * @param nodeIdFactory used to create the nodeId if needed
+ * @return the newly created NodeId or the old one if already present
+ */
+ UUID setup(Supplier<? extends UUID> nodeIdFactory);
+
+ State readState();
+
+ void writeState(State state);
+
+ @Override
+ default void close() throws Exception {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index b00298a..81f13bf 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1722,6 +1722,27 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
+ <xsd:element name="jdbc-lock-acquisition-timeout" type="xsd:int" minOccurs="0" maxOccurs="1">
+ <xsd:annotation>
+ <xsd:documentation>
+ The max allowed time in milliseconds while trying to acquire a JDBC lock.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ <xsd:element name="jdbc-lock-renew-period" type="xsd:int" minOccurs="0" maxOccurs="1">
+ <xsd:annotation>
+ <xsd:documentation>
+ The period in milliseconds of the keep alive service of a JDBC lock.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ <xsd:element name="jdbc-lock-expiration" type="xsd:int" minOccurs="0" maxOccurs="1">
+ <xsd:annotation>
+ <xsd:documentation>
+ The time in milliseconds a JDBC lock is considered valid without keeping it alive.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
<xsd:element name="jms-bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>