You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2018/04/19 12:56:36 UTC
activemq-artemis git commit: ARTEMIS-1784 JDBC NodeManager should
just use DMBS clock
Repository: activemq-artemis
Updated Branches:
refs/heads/1.x 9fe47f77d -> adb11b88a
ARTEMIS-1784 JDBC NodeManager should just use DMBS clock
(cherry picked from commit 6e9195224c163090e485e0d76c6b4418a583fb75)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/adb11b88
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/adb11b88
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/adb11b88
Branch: refs/heads/1.x
Commit: adb11b88adb054bbc3edb6b520a7eacda484255f
Parents: 9fe47f7
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Apr 4 18:37:12 2018 +0200
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Thu Apr 19 14:32:10 2018 +0200
----------------------------------------------------------------------
.../config/ActiveMQDefaultConfiguration.java | 6 -
.../src/main/resources/journal-sql.properties | 6 +-
.../storage/DatabaseStorageConfiguration.java | 10 -
.../deployers/impl/FileConfigurationParser.java | 1 -
.../core/server/impl/jdbc/JdbcLeaseLock.java | 186 +++++++++----
.../core/server/impl/jdbc/JdbcNodeManager.java | 266 +++++++++++--------
.../impl/jdbc/JdbcSharedStateManager.java | 227 +++++++---------
.../resources/schema/artemis-configuration.xsd | 7 -
.../server/impl/jdbc/JdbcLeaseLockTest.java | 4 +-
.../impl/jdbc/JdbcSharedStateManagerTest.java | 1 -
.../artemis/tests/util/ActiveMQTestBase.java | 5 -
docs/user-manual/en/persistence.md | 4 -
12 files changed, 386 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index f772763..1920fa8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -449,8 +449,6 @@ public final class ActiveMQDefaultConfiguration {
private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = -1;
- private static final long DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME = TimeUnit.SECONDS.toMillis(60);
-
// Default JMS Bingings table name, used with Database storage type
private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS";
@@ -1228,10 +1226,6 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS;
}
- public static long getDefaultJdbcMaxAllowedMillisFromDbTime() {
- return DEFAULT_JDBC_MAX_ALLOWED_MILLIS_FROM_DB_TIME;
- }
-
public static String getDefaultJMSBindingsTableName() {
return DEFAULT_JMS_BINDINGS_TABLE_NAME;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-jdbc-store/src/main/resources/journal-sql.properties
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/resources/journal-sql.properties b/artemis-jdbc-store/src/main/resources/journal-sql.properties
index b526b97..e5c70ba 100644
--- a/artemis-jdbc-store/src/main/resources/journal-sql.properties
+++ b/artemis-jdbc-store/src/main/resources/journal-sql.properties
@@ -38,10 +38,10 @@ count-journal-record=SELECT COUNT(*) FROM %s
create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
create-state=INSERT INTO %s (ID) VALUES (%s)
-try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = %s
+try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR (HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP AND ? > CURRENT_TIMESTAMP)) AND ID = %s
try-release-lock=UPDATE %s SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = %s
-is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM %s WHERE ID = %s
-renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = %s
+is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME, CURRENT_TIMESTAMP FROM %s WHERE ID = %s
+renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND HOLDER_EXPIRATION_TIME IS NOT NULL AND ? > HOLDER_EXPIRATION_TIME AND ? > CURRENT_TIMESTAMP AND ID = %s
current-timestamp=SELECT CURRENT_TIMESTAMP FROM %s
write-state=UPDATE %s SET STATE = ? WHERE ID = %s
read-state=SELECT STATE FROM %s WHERE ID = %s
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
index 59e12aa..699b3d5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
@@ -52,8 +52,6 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
- private long jdbcMaxAllowedMillisFromDbTime = ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime();
-
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
@@ -187,12 +185,4 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) {
this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis;
}
-
- public long getJdbcMaxAllowedMillisFromDbTime() {
- return jdbcMaxAllowedMillisFromDbTime;
- }
-
- public void setJdbcMaxAllowedMillisFromDbTime(long jdbcMaxAllowedMillisFromDbTime) {
- this.jdbcMaxAllowedMillisFromDbTime = jdbcMaxAllowedMillisFromDbTime;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 3adeeb9..a60f70f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1167,7 +1167,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
- conf.setJdbcMaxAllowedMillisFromDbTime(getLong(storeNode, "jdbc-max-allowed-millis-from-db-time", conf.getJdbcMaxAllowedMillisFromDbTime(), Validators.NO_CHECK));
return conf;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
index 03f04ec..11c1aab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.jboss.logging.Logger;
@@ -35,14 +36,15 @@ final class JdbcLeaseLock implements LeaseLock {
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
private static final int MAX_HOLDER_ID_LENGTH = 128;
private final Connection connection;
- private long millisDiffFromDbTime;
private final String holderId;
private final PreparedStatement tryAcquireLock;
private final PreparedStatement tryReleaseLock;
private final PreparedStatement renewLock;
private final PreparedStatement isLocked;
+ private final PreparedStatement currentDateTime;
private final long expirationMillis;
private boolean maybeAcquired;
+ private final String lockName;
/**
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
@@ -54,20 +56,22 @@ final class JdbcLeaseLock implements LeaseLock {
PreparedStatement tryReleaseLock,
PreparedStatement renewLock,
PreparedStatement isLocked,
+ PreparedStatement currentDateTime,
long expirationMIllis,
- long millisDiffFromDbTime) {
+ String lockName) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
}
this.holderId = holderId;
- this.millisDiffFromDbTime = millisDiffFromDbTime;
this.tryAcquireLock = tryAcquireLock;
this.tryReleaseLock = tryReleaseLock;
this.renewLock = renewLock;
this.isLocked = isLocked;
+ this.currentDateTime = currentDateTime;
this.expirationMillis = expirationMIllis;
this.maybeAcquired = false;
this.connection = connection;
+ this.lockName = lockName;
}
public String holderId() {
@@ -79,32 +83,88 @@ final class JdbcLeaseLock implements LeaseLock {
return expirationMillis;
}
- private long timeDifference() {
- return millisDiffFromDbTime;
+ private String readableLockStatus() {
+ try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ final boolean autoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ try {
+ final String lockStatus;
+ final PreparedStatement preparedStatement = this.isLocked;
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ lockStatus = null;
+ } else {
+ final String currentHolderId = resultSet.getString(1);
+ final Timestamp expirationTime = resultSet.getTimestamp(2);
+ final Timestamp currentTimestamp = resultSet.getTimestamp(3);
+ lockStatus = "holderId = " + currentHolderId + " expirationTime = " + expirationTime + " currentTimestamp = " + currentTimestamp;
+ }
+ }
+ connection.commit();
+ return lockStatus;
+ } catch (SQLException ie) {
+ connection.rollback();
+ return ie.getMessage();
+ } finally {
+ connection.setAutoCommit(autoCommit);
+ }
+ } catch (SQLException e) {
+ return e.getMessage();
+ }
+ }
+
+ private long dbCurrentTimeMillis() throws SQLException {
+ final long start = System.nanoTime();
+ try (ResultSet resultSet = currentDateTime.executeQuery()) {
+ resultSet.next();
+ final Timestamp currentTimestamp = resultSet.getTimestamp(1);
+ final long elapsedTime = System.nanoTime() - start;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
+ lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
+ }
+ return currentTimestamp.getTime();
+ }
}
@Override
public boolean renew() {
synchronized (connection) {
try {
- final boolean result;
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
- final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = this.renewLock;
- final long now = System.currentTimeMillis() + timeDifference;
- final Timestamp timestamp = new Timestamp(now + expirationMillis);
- preparedStatement.setTimestamp(1, timestamp);
+ final long now = dbCurrentTimeMillis();
+ final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
+ lockName, holderId, expirationTime);
+ }
+ preparedStatement.setTimestamp(1, expirationTime);
preparedStatement.setString(2, holderId);
- result = preparedStatement.executeUpdate() == 1;
+ preparedStatement.setTimestamp(3, expirationTime);
+ preparedStatement.setTimestamp(4, expirationTime);
+ final int updatedRows = preparedStatement.executeUpdate();
+ final boolean renewed = updatedRows == 1;
+ connection.commit();
+ if (!renewed) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
+ lockName, holderId, readableLockStatus());
+ }
+ } else {
+ LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
+ }
+ return renewed;
} catch (SQLException ie) {
connection.rollback();
- connection.setAutoCommit(true);
throw new IllegalStateException(ie);
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
- connection.commit();
- connection.setAutoCommit(true);
- return result;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -115,30 +175,36 @@ final class JdbcLeaseLock implements LeaseLock {
public boolean tryAcquire() {
synchronized (connection) {
try {
- final boolean acquired;
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
- final long timeDifference = timeDifference();
final PreparedStatement preparedStatement = tryAcquireLock;
- final long now = System.currentTimeMillis() + timeDifference;
+ final long now = dbCurrentTimeMillis();
preparedStatement.setString(1, holderId);
- final Timestamp timestamp = new Timestamp(now + expirationMillis);
- preparedStatement.setTimestamp(2, timestamp);
- acquired = preparedStatement.executeUpdate() == 1;
+ final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+ preparedStatement.setTimestamp(2, expirationTime);
+ preparedStatement.setTimestamp(3, expirationTime);
+ LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
+ lockName, holderId, expirationTime);
+ final boolean acquired = preparedStatement.executeUpdate() == 1;
+ connection.commit();
+ if (acquired) {
+ this.maybeAcquired = true;
+ LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
+ } else {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
+ lockName, holderId, readableLockStatus());
+ }
+ }
+ return acquired;
} catch (SQLException ie) {
connection.rollback();
- connection.setAutoCommit(true);
throw new IllegalStateException(ie);
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
- connection.commit();
- connection.setAutoCommit(true);
- if (acquired) {
- this.maybeAcquired = true;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(holderId + " has acquired a lock");
- }
- }
- return acquired;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -158,10 +224,11 @@ final class JdbcLeaseLock implements LeaseLock {
private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
synchronized (connection) {
try {
- boolean result;
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
- final long timeDifference = timeDifference();
+ boolean result;
final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
@@ -169,29 +236,33 @@ final class JdbcLeaseLock implements LeaseLock {
} else {
final String currentHolderId = resultSet.getString(1);
result = holderIdFilter.test(currentHolderId);
- //warn about any zombie lock
- final Timestamp timestamp = resultSet.getTimestamp(2);
- if (timestamp != null) {
- final long lockExpirationTime = timestamp.getTime();
- final long now = System.currentTimeMillis() + timeDifference;
- final long expiredBy = now - lockExpirationTime;
+ final Timestamp expirationTime = resultSet.getTimestamp(2);
+ final Timestamp currentTimestamp = resultSet.getTimestamp(3);
+ final long currentTimestampMillis = currentTimestamp.getTime();
+ boolean zombie = false;
+ if (expirationTime != null) {
+ final long lockExpirationTime = expirationTime.getTime();
+ final long expiredBy = currentTimestampMillis - lockExpirationTime;
if (expiredBy > 0) {
result = false;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms");
- }
+ zombie = true;
}
}
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
+ lockName, holderId, zombie ? "zombie lock" : "lock",
+ currentHolderId, expirationTime, currentTimestamp);
+ }
}
}
+ connection.commit();
+ return result;
} catch (SQLException ie) {
connection.rollback();
- connection.setAutoCommit(true);
throw new IllegalStateException(ie);
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
- connection.commit();
- connection.setAutoCommit(true);
- return result;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -202,26 +273,30 @@ final class JdbcLeaseLock implements LeaseLock {
public void release() {
synchronized (connection) {
try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
final PreparedStatement preparedStatement = this.tryReleaseLock;
preparedStatement.setString(1, holderId);
- if (preparedStatement.executeUpdate() != 1) {
- LOGGER.warn(holderId + " has failed to release a lock");
- } else {
+ final boolean released = preparedStatement.executeUpdate() == 1;
+ //consider it as released to avoid on finalize to be reclaimed
+ this.maybeAcquired = false;
+ connection.commit();
+ if (!released) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(holderId + " has released a lock");
+ LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
+ lockName, holderId, readableLockStatus());
}
+ } else {
+ LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
}
- //consider it as released to avoid on finalize to be reclaimed
- this.maybeAcquired = false;
} catch (SQLException ie) {
connection.rollback();
- connection.setAutoCommit(true);
throw new IllegalStateException(ie);
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
- connection.commit();
- connection.setAutoCommit(true);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -242,6 +317,7 @@ final class JdbcLeaseLock implements LeaseLock {
this.tryAcquireLock.close();
this.renewLock.close();
this.isLocked.close();
+ this.currentDateTime.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
index 465680f..189d3fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -41,7 +41,7 @@ import org.jboss.logging.Logger;
*/
public final class JdbcNodeManager extends NodeManager {
- private static final Logger logger = Logger.getLogger(JdbcNodeManager.class);
+ private static final Logger LOGGER = Logger.getLogger(JdbcNodeManager.class);
private static final long MAX_PAUSE_MILLIS = 2000L;
private final Supplier<? extends SharedStateManager> sharedStateManagerFactory;
@@ -50,7 +50,6 @@ public final class JdbcNodeManager extends NodeManager {
private SharedStateManager sharedStateManager;
private ScheduledLeaseLock scheduledLiveLock;
private ScheduledLeaseLock scheduledBackupLock;
- private final long lockRenewPeriodMillis;
private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false;
private final LeaseLock.Pauser pauser;
@@ -74,7 +73,6 @@ public final class JdbcNodeManager extends NodeManager {
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
- configuration.getJdbcMaxAllowedMillisFromDbTime(),
configuration.getDataSource(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
@@ -88,7 +86,6 @@ public final class JdbcNodeManager extends NodeManager {
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
- configuration.getJdbcMaxAllowedMillisFromDbTime(),
configuration.getJdbcConnectionUrl(),
configuration.getJdbcDriverClassName(),
sqlProvider,
@@ -103,7 +100,6 @@ public final class JdbcNodeManager extends NodeManager {
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
- long maxAllowedMillisFromDbTime,
DataSource dataSource,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
@@ -114,10 +110,8 @@ public final class JdbcNodeManager extends NodeManager {
networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(),
lockExpirationMillis,
- maxAllowedMillisFromDbTime,
dataSource,
provider),
- false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
@@ -130,7 +124,6 @@ public final class JdbcNodeManager extends NodeManager {
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
- long maxAllowedMillisFromDbTime,
String jdbcUrl,
String driverClass,
SQLProvider provider,
@@ -142,11 +135,9 @@ public final class JdbcNodeManager extends NodeManager {
networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(),
lockExpirationMillis,
- maxAllowedMillisFromDbTime,
jdbcUrl,
driverClass,
provider),
- false,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
@@ -169,24 +160,22 @@ public final class JdbcNodeManager extends NodeManager {
final int networkTimeout = configuration.getJdbcNetworkTimeout();
if (networkTimeout >= 0) {
if (networkTimeout > lockExpiration) {
- logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
+ LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
}
} else {
- logger.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
+ LOGGER.warn("jdbc-network-timeout isn't properly configured: the recommended value is <= jdbc-lock-expiration");
}
}
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
- boolean replicatedBackup,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
- super(replicatedBackup, null);
+ super(false, null);
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
- this.lockRenewPeriodMillis = lockRenewPeriodMillis;
- this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
+ this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManagerFactory = sharedStateManagerFactory;
this.scheduledLiveLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService,
@@ -217,10 +206,9 @@ public final class JdbcNodeManager extends NodeManager {
return;
}
this.sharedStateManager = sharedStateManagerFactory.get();
- if (!replicatedBackup) {
- final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
- setUUID(nodeId);
- }
+ LOGGER.debug("setup sharedStateManager on start");
+ final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
+ setUUID(nodeId);
this.scheduledLiveLock = scheduledLiveLockFactory.get();
this.scheduledBackupLock = scheduledBackupLockFactory.get();
super.start();
@@ -259,35 +247,62 @@ public final class JdbcNodeManager extends NodeManager {
@Override
public boolean isAwaitingFailback() throws Exception {
- return readSharedState() == SharedStateManager.State.FAILING_BACK;
+ LOGGER.debug("ENTER isAwaitingFailback");
+ try {
+ return readSharedState() == SharedStateManager.State.FAILING_BACK;
+ } finally {
+ LOGGER.debug("EXIT isAwaitingFailback");
+ }
}
@Override
public boolean isBackupLive() throws Exception {
- //is anyone holding the live lock?
- return this.scheduledLiveLock.lock().isHeld();
+ LOGGER.debug("ENTER isBackupLive");
+ try {
+ //is anyone holding the live lock?
+ return this.scheduledLiveLock.lock().isHeld();
+ } finally {
+ LOGGER.debug("EXIT isBackupLive");
+ }
}
@Override
public void stopBackup() throws Exception {
- if (replicatedBackup) {
- final UUID nodeId = getUUID();
- sharedStateManager.writeNodeId(nodeId);
+ LOGGER.debug("ENTER stopBackup");
+ try {
+ if (this.scheduledBackupLock.isStarted()) {
+ LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
+ this.scheduledBackupLock.stop();
+ this.scheduledBackupLock.lock().release();
+ } else {
+ LOGGER.debug("scheduledBackupLock is not running");
+ }
+ } finally {
+ LOGGER.debug("EXIT stopBackup");
}
- releaseBackup();
}
@Override
public void interrupt() {
+ LOGGER.debug("ENTER interrupted");
//need to be volatile: must be called concurrently to work as expected
interrupted = true;
+ LOGGER.debug("EXIT interrupted");
}
@Override
public void releaseBackup() throws Exception {
- if (this.scheduledBackupLock.lock().isHeldByCaller()) {
- this.scheduledBackupLock.stop();
- this.scheduledBackupLock.lock().release();
+ LOGGER.debug("ENTER releaseBackup");
+ try {
+ if (this.scheduledBackupLock.isStarted()) {
+ LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
+ this.scheduledBackupLock.stop();
+ this.scheduledBackupLock.lock().release();
+ } else {
+ LOGGER.debug("scheduledBackupLock is not running");
+ }
+ } finally {
+ LOGGER.debug("EXIT releaseBackup");
}
}
@@ -322,11 +337,8 @@ public final class JdbcNodeManager extends NodeManager {
if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
if (!this.scheduledLiveLock.lock().renew()) {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
- try {
- ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
- } finally {
- throw e;
- }
+ ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
+ throw e;
}
}
}
@@ -343,7 +355,7 @@ public final class JdbcNodeManager extends NodeManager {
try {
stateWhileLocked = readSharedState();
} catch (Throwable t) {
- logger.error("error while holding the live node lock and tried to read the shared state", t);
+ LOGGER.error("error while holding the live node lock and tried to read the shared state", t);
this.scheduledLiveLock.lock().release();
throw t;
}
@@ -351,9 +363,7 @@ public final class JdbcNodeManager extends NodeManager {
renewLiveLockIfNeeded(acquiredOn);
liveWhileLocked = true;
} else {
- if (logger.isDebugEnabled()) {
- logger.debug("state is " + stateWhileLocked + " while holding the live lock");
- }
+ LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked);
//state is not live: can (try to) release the lock
this.scheduledLiveLock.lock().release();
}
@@ -362,110 +372,145 @@ public final class JdbcNodeManager extends NodeManager {
@Override
public void awaitLiveNode() throws Exception {
- boolean liveWhileLocked = false;
- while (!liveWhileLocked) {
- //check first without holding any lock
- final SharedStateManager.State state = readSharedState();
- if (state == SharedStateManager.State.LIVE) {
- //verify if the state is live while holding the live node lock too
- liveWhileLocked = lockLiveAndCheckLiveState();
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("awaiting live node...state: " + state);
+ LOGGER.debug("ENTER awaitLiveNode");
+ try {
+ boolean liveWhileLocked = false;
+ while (!liveWhileLocked) {
+ //check first without holding any lock
+ final SharedStateManager.State state = readSharedState();
+ if (state == SharedStateManager.State.LIVE) {
+ //verify if the state is live while holding the live node lock too
+ liveWhileLocked = lockLiveAndCheckLiveState();
+ } else {
+ LOGGER.debugf("state while awaiting live node: %s", state);
+ }
+ if (!liveWhileLocked) {
+ checkInterrupted(() -> "awaitLiveNode got interrupted!");
+ pauser.idle();
}
}
- if (!liveWhileLocked) {
- checkInterrupted(() -> "awaitLiveNode got interrupted!");
- pauser.idle();
- }
+ //state is LIVE and live lock is acquired and valid
+ LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE);
+ this.scheduledLiveLock.start();
+ } finally {
+ LOGGER.debug("EXIT awaitLiveNode");
}
- //state is LIVE and live lock is acquired and valid
- logger.debug("acquired live node lock");
- this.scheduledLiveLock.start();
}
@Override
public void startBackup() throws Exception {
- assert !replicatedBackup; // should not be called if this is a replicating backup
- ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
-
- lock(scheduledBackupLock.lock());
- scheduledBackupLock.start();
- ActiveMQServerLogger.LOGGER.gotBackupLock();
- if (getUUID() == null)
- readNodeId();
+ LOGGER.debug("ENTER startBackup");
+ try {
+ ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
+
+ lock(scheduledBackupLock.lock());
+ scheduledBackupLock.start();
+ ActiveMQServerLogger.LOGGER.gotBackupLock();
+ if (getUUID() == null)
+ readNodeId();
+ } finally {
+ LOGGER.debug("EXIT startBackup");
+ }
}
@Override
public ActivateCallback startLiveNode() throws Exception {
- setFailingBack();
+ LOGGER.debug("ENTER startLiveNode");
+ try {
+ setFailingBack();
- final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
+ final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
- ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
+ ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
- lock(this.scheduledLiveLock.lock());
+ lock(this.scheduledLiveLock.lock());
- this.scheduledLiveLock.start();
+ this.scheduledLiveLock.start();
- ActiveMQServerLogger.LOGGER.obtainedLiveLock();
+ ActiveMQServerLogger.LOGGER.obtainedLiveLock();
- return new ActivateCallback() {
- @Override
- public void preActivate() {
- }
+ return new ActivateCallback() {
+ @Override
+ public void preActivate() {
+ }
- @Override
- public void activated() {
- }
+ @Override
+ public void activated() {
+ }
- @Override
- public void deActivate() {
- }
+ @Override
+ public void deActivate() {
+ }
- @Override
- public void activationComplete() {
- try {
- //state can be written only if the live renew task is running
- setLive();
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ @Override
+ public void activationComplete() {
+ LOGGER.debug("ENTER activationComplete");
+ try {
+ //state can be written only if the live renew task is running
+ setLive();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ } finally {
+ LOGGER.debug("EXIT activationComplete");
+ }
}
- }
- };
+ };
+ } finally {
+ LOGGER.debug("EXIT startLiveNode");
+ }
}
@Override
public void pauseLiveServer() throws Exception {
- if (scheduledLiveLock.isStarted()) {
- setPaused();
- scheduledLiveLock.stop();
- scheduledLiveLock.lock().release();
- } else if (scheduledLiveLock.lock().renew()) {
- setPaused();
- scheduledLiveLock.lock().release();
- } else {
- final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
- try {
- ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
- } finally {
- throw e;
+ LOGGER.debug("ENTER pauseLiveServer");
+ try {
+ if (scheduledLiveLock.isStarted()) {
+ LOGGER.debug("scheduledLiveLock is running: set paused shared state, stop it and release live lock");
+ setPaused();
+ scheduledLiveLock.stop();
+ scheduledLiveLock.lock().release();
+ } else {
+ LOGGER.debug("scheduledLiveLock is not running: try renew live lock");
+ if (scheduledLiveLock.lock().renew()) {
+ LOGGER.debug("live lock renewed: set paused shared state and release live lock");
+ setPaused();
+ scheduledLiveLock.lock().release();
+ } else {
+ final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
+ ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
+ throw e;
+ }
}
+ } finally {
+ LOGGER.debug("EXIT pauseLiveServer");
}
}
@Override
public void crashLiveServer() throws Exception {
- if (this.scheduledLiveLock.lock().isHeldByCaller()) {
- scheduledLiveLock.stop();
- this.scheduledLiveLock.lock().release();
+ LOGGER.debug("ENTER crashLiveServer");
+ try {
+ if (this.scheduledLiveLock.isStarted()) {
+ LOGGER.debug("scheduledLiveLock is running: request stop it and release live lock");
+ this.scheduledLiveLock.stop();
+ this.scheduledLiveLock.lock().release();
+ } else {
+ LOGGER.debug("scheduledLiveLock is not running");
+ }
+ } finally {
+ LOGGER.debug("EXIT crashLiveServer");
}
}
@Override
public void awaitLiveStatus() {
- while (readSharedState() != SharedStateManager.State.LIVE) {
- pauser.idle();
+ LOGGER.debug("ENTER awaitLiveStatus");
+ try {
+ while (readSharedState() != SharedStateManager.State.LIVE) {
+ pauser.idle();
+ }
+ } finally {
+ LOGGER.debug("EXIT awaitLiveStatus");
}
}
@@ -482,17 +527,20 @@ public final class JdbcNodeManager extends NodeManager {
}
private void writeSharedState(SharedStateManager.State state) {
- assert !this.replicatedBackup : "the replicated backup can't write the shared state!";
+ LOGGER.debugf("writeSharedState state = %s", state);
this.sharedStateManager.writeState(state);
}
private SharedStateManager.State readSharedState() {
- return this.sharedStateManager.readState();
+ final SharedStateManager.State state = this.sharedStateManager.readState();
+ LOGGER.debugf("readSharedState state = %s", state);
+ return state;
}
@Override
public SimpleString readNodeId() {
final UUID nodeId = this.sharedStateManager.readNodeId();
+ LOGGER.debugf("readNodeId nodeId = %s", nodeId);
setUUID(nodeId);
return getNodeId();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
index a8b07e9..9357435 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
@@ -22,8 +22,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
@@ -39,10 +37,9 @@ import org.jboss.logging.Logger;
final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager {
private static final Logger logger = Logger.getLogger(JdbcSharedStateManager.class);
- public static final int MAX_SETUP_ATTEMPTS = 20;
+ private static final int MAX_SETUP_ATTEMPTS = 20;
private final String holderId;
private final long lockExpirationMillis;
- private final long maxAllowedMillisFromDbTime;
private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId;
@@ -50,16 +47,14 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private PreparedStatement initializeNodeId;
private PreparedStatement readState;
private PreparedStatement writeState;
- private long timeDifferenceMillisFromDb = 0;
public static JdbcSharedStateManager usingDataSource(String holderId,
int networkTimeout,
Executor networkTimeoutExecutor,
long locksExpirationMillis,
- long maxAllowedMillisFromDbTime,
DataSource dataSource,
SQLProvider provider) {
- final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
+ final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
sharedStateManager.setDataSource(dataSource);
sharedStateManager.setSqlProvider(provider);
@@ -73,7 +68,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
- long maxAllowedMillisFromDbTime,
String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
@@ -81,7 +75,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
-1,
null,
locksExpirationMillis,
- maxAllowedMillisFromDbTime,
jdbcConnectionUrl,
jdbcDriverClass,
provider);
@@ -91,11 +84,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
int networkTimeout,
Executor networkTimeoutExecutor,
long locksExpirationMillis,
- long maxAllowedMillisFromDbTime,
String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
- final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, maxAllowedMillisFromDbTime);
+ final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
@@ -109,63 +101,33 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
}
@Override
- protected void createSchema() throws SQLException {
+ protected void createSchema() {
try {
createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
} catch (SQLException e) {
//no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized
- if (logger.isDebugEnabled()) {
- logger.debug("Error while creating the schema of the JDBC shared state manager", e);
- }
- }
- }
-
- /**
- * It computes the distance in milliseconds of {@link System#currentTimeMillis()} from the DBMS time.<br>
- * It must be added to {@link System#currentTimeMillis()} in order to approximate the DBMS time.
- * It will create a transaction by its own.
- */
- static long timeDifferenceMillisFromDb(Connection connection, SQLProvider sqlProvider) throws SQLException {
- try (Statement statement = connection.createStatement()) {
- connection.setAutoCommit(false);
- final long result;
- try (ResultSet resultSet = statement.executeQuery(sqlProvider.currentTimestampSQL())) {
- resultSet.next();
- final Timestamp timestamp = resultSet.getTimestamp(1);
- final long systemNow = System.currentTimeMillis();
- result = timestamp.getTime() - systemNow;
- } catch (SQLException ie) {
- connection.rollback();
- connection.setAutoCommit(true);
- throw ie;
- }
- connection.commit();
- connection.setAutoCommit(true);
- return result;
+ logger.debug("Error while creating the schema of the JDBC shared state manager", e);
}
}
static JdbcLeaseLock createLiveLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
- long expirationMillis,
- long timeDifferenceMillisFromDb) throws SQLException {
- return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
+ long expirationMillis) throws SQLException {
+ return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE");
}
static JdbcLeaseLock createBackupLock(String holderId,
Connection connection,
SQLProvider sqlProvider,
- long expirationMillis,
- long timeDifferenceMillisFromDb) throws SQLException {
- return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), expirationMillis, timeDifferenceMillisFromDb);
+ long expirationMillis) throws SQLException {
+ return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP");
}
@Override
protected void prepareStatements() throws SQLException {
- final long timeDifferenceMillisFromDb = validateTimeDifferenceMillisFromDb();
- this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
- this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, timeDifferenceMillisFromDb);
+ this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
+ this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
@@ -173,32 +135,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
}
- /**
- * It will be populated only after a {@link #start()}.
- */
- long timeDifferenceMillisFromDb() {
- return timeDifferenceMillisFromDb;
- }
-
- private long validateTimeDifferenceMillisFromDb() throws SQLException {
- final long timeDifferenceMillisFromDb = timeDifferenceMillisFromDb(connection, sqlProvider);
- this.timeDifferenceMillisFromDb = timeDifferenceMillisFromDb;
- final long absoluteTimeDifference = Math.abs(timeDifferenceMillisFromDb);
- if (absoluteTimeDifference > maxAllowedMillisFromDbTime) {
- throw new IllegalStateException("The system is far " + (-timeDifferenceMillisFromDb) + " milliseconds from DB time, exceeding maxAllowedMillisFromDbTime = " + maxAllowedMillisFromDbTime);
- }
- if (absoluteTimeDifference > 0) {
- final String msg = "The system is far " + timeDifferenceMillisFromDb + " milliseconds from DB time";
- final Logger.Level logLevel = absoluteTimeDifference > lockExpirationMillis ? Logger.Level.WARN : Logger.Level.DEBUG;
- logger.log(logLevel, msg);
- }
- return timeDifferenceMillisFromDb;
- }
-
- private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long maxAllowedMillisFromDbTime) {
+ private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis;
- this.maxAllowedMillisFromDbTime = maxAllowedMillisFromDbTime;
}
@Override
@@ -232,9 +171,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
- final UUID nodeId = rawReadNodeId();
- return nodeId;
+ try {
+ return rawReadNodeId();
+ } finally {
+ connection.setAutoCommit(autoCommit);
+ }
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -246,8 +189,13 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
- rawWriteNodeId(nodeId);
+ try {
+ rawWriteNodeId(nodeId);
+ } finally {
+ connection.setAutoCommit(autoCommit);
+ }
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -258,7 +206,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
final PreparedStatement preparedStatement = this.writeNodeId;
preparedStatement.setString(1, nodeId.toString());
if (preparedStatement.executeUpdate() != 1) {
- throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!");
+ throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
}
}
@@ -283,9 +231,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
return nodeId;
}
} catch (SQLException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Error while attempting to setup the NodeId", e);
- }
+ logger.debug("Error while attempting to setup the NodeId", e);
lastError = e;
}
}
@@ -299,36 +245,34 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
}
private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException {
- final UUID nodeId;
- connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
- connection.setAutoCommit(false);
- try {
- //optimistic try to initialize nodeId
- if (rawInitializeNodeId(newNodeId)) {
- nodeId = newNodeId;
- } else {
- nodeId = rawReadNodeId();
- }
- } catch (SQLException e) {
- connection.rollback();
- connection.setAutoCommit(true);
- if (logger.isDebugEnabled()) {
- logger.debug("Rollback while trying to update NodeId to " + newNodeId, e);
- }
- return null;
- }
- if (nodeId != null) {
- connection.commit();
- connection.setAutoCommit(true);
- return nodeId;
- } else {
- //that means that the rawInitializeNodeId has failed just due to contention or the nodeId wasn't committed yet
- connection.rollback();
- connection.setAutoCommit(true);
- if (logger.isDebugEnabled()) {
- logger.debug("Rollback after failed to update NodeId to " + newNodeId + " and haven't found any NodeId");
+ synchronized (connection) {
+ connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ final boolean autoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ try {
+ final UUID nodeId;
+ //optimistic try to initialize nodeId
+ if (rawInitializeNodeId(newNodeId)) {
+ nodeId = newNodeId;
+ } else {
+ nodeId = rawReadNodeId();
+ }
+ if (nodeId != null) {
+ connection.commit();
+ return nodeId;
+ } else {
+ //rawInitializeNodeId has failed just due to contention or nodeId wasn't committed yet
+ connection.rollback();
+ logger.debugf("Rollback after failed to update NodeId to %s and haven't found any NodeId", newNodeId);
+ return null;
+ }
+ } catch (SQLException e) {
+ connection.rollback();
+ logger.debugf(e, "Rollback while trying to update NodeId to %s", newNodeId);
+ return null;
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
- return null;
}
}
@@ -370,17 +314,26 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
- connection.setAutoCommit(true);
+ final boolean autoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
final State state;
- final PreparedStatement preparedStatement = this.readState;
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (!resultSet.next()) {
- state = State.FIRST_TIME_START;
- } else {
- state = decodeState(resultSet.getString(1));
+ try {
+ final PreparedStatement preparedStatement = this.readState;
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ state = State.FIRST_TIME_START;
+ } else {
+ state = decodeState(resultSet.getString(1));
+ }
}
+ connection.commit();
+ return state;
+ } catch (SQLException ie) {
+ connection.rollback();
+ throw new IllegalStateException(ie);
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
- return state;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -393,11 +346,21 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
- connection.setAutoCommit(true);
- final PreparedStatement preparedStatement = this.writeState;
- preparedStatement.setString(1, encodedState);
- if (preparedStatement.executeUpdate() != 1) {
- throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!");
+ final boolean autoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ try {
+ final PreparedStatement preparedStatement = this.writeState;
+ preparedStatement.setString(1, encodedState);
+ if (preparedStatement.executeUpdate() != 1) {
+ throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
+ }
+ connection.commit();
+ } catch (SQLException ie) {
+ connection.rollback();
+ connection.setAutoCommit(true);
+ throw new IllegalStateException(ie);
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
@@ -408,17 +371,15 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
@Override
public void stop() throws SQLException {
//release all the managed resources inside the connection lock
- if (sqlProvider.closeConnectionOnShutdown()) {
- synchronized (connection) {
- this.readNodeId.close();
- this.writeNodeId.close();
- this.initializeNodeId.close();
- this.readState.close();
- this.writeState.close();
- this.liveLock.close();
- this.backupLock.close();
- super.stop();
- }
+ synchronized (connection) {
+ this.readNodeId.close();
+ this.writeNodeId.close();
+ this.initializeNodeId.close();
+ this.readState.close();
+ this.writeState.close();
+ this.liveLock.close();
+ this.backupLock.close();
+ super.stop();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 6351cbe..a507c16 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1743,13 +1743,6 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
- <xsd:element name="jdbc-max-allowed-millis-from-db-time" type="xsd:int" minOccurs="0" maxOccurs="1">
- <xsd:annotation>
- <xsd:documentation>
- The absolute time in milliseconds the system clock is allowed to be distant from the DB time
- </xsd:documentation>
- </xsd:annotation>
- </xsd:element>
<xsd:element name="jms-bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
index 2ca08d4..84d3dd4 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
@@ -71,8 +71,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
UUID.randomUUID().toString(),
jdbcSharedStateManager.getConnection(),
sqlProvider,
- acquireMillis,
- jdbcSharedStateManager.timeDifferenceMillisFromDb());
+ acquireMillis);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
@@ -100,7 +99,6 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
.usingConnectionUrl(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
- dbConf.getJdbcMaxAllowedMillisFromDbTime(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
index e7ac316..7340026 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
@@ -52,7 +52,6 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
return JdbcSharedStateManager.usingConnectionUrl(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
- dbConf.getJdbcMaxAllowedMillisFromDbTime(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 427f1e7..43fa6f1 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -473,7 +473,6 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
- dbStorageConfiguration.setJdbcMaxAllowedMillisFromDbTime(getJdbcMaxAllowedMillisFromDbTime());
return dbStorageConfiguration;
}
@@ -489,10 +488,6 @@ public abstract class ActiveMQTestBase extends Assert {
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
}
- protected long getJdbcMaxAllowedMillisFromDbTime() {
- return Long.getLong("jdbc.max.diff.db", ActiveMQDefaultConfiguration.getDefaultJdbcMaxAllowedMillisFromDbTime());
- }
-
public void destroyTables(List<String> tableNames) throws Exception {
Driver driver = getDriver(getJDBCClassName());
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb11b88/docs/user-manual/en/persistence.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md
index d5635be..ca7bcea 100644
--- a/docs/user-manual/en/persistence.md
+++ b/docs/user-manual/en/persistence.md
@@ -454,10 +454,6 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
is 20000 milliseconds (ie 20 seconds).
-- `jdbc-max-allowed-millis-from-db-time`
-
- The absolute time in milliseconds the system clock is allowed to be distant from the DB time, otherwise a critical error will be raised. The default value is 60000 milliseconds (ie 60 seconds).
-
## Configuring Apache ActiveMQ Artemis for Zero Persistence
In some situations, zero persistence is sometimes required for a