You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/07 14:11:41 UTC
[1/2] activemq-artemis git commit: This closes #1515
Repository: activemq-artemis
Updated Branches:
refs/heads/master 99b2e4c0f -> 65a0c6104
This closes #1515
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/65a0c610
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/65a0c610
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/65a0c610
Branch: refs/heads/master
Commit: 65a0c6104aeea5b946bfcf2d90515fc4257c6c87
Parents: 99b2e4c 9cf222b
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 7 10:11:35 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 7 10:11:35 2017 -0400
----------------------------------------------------------------------
.../utils/critical/CriticalAnalyzer.java | 10 +++---
.../utils/critical/CriticalAnalyzerImpl.java | 32 ++++++++++----------
.../utils/critical/CriticalComponent.java | 2 +-
.../artemis/utils/critical/CriticalMeasure.java | 30 ++++++++++++------
.../utils/critical/EmptyCriticalAnalyzer.java | 14 +++++----
.../utils/critical/CriticalAnalyzerTest.java | 29 +++++++++++++++---
.../core/server/impl/ActiveMQServerImpl.java | 24 +++++++--------
7 files changed, 89 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1393 CriticalAnalyzer
timeout uses System::currentTimeMillis
Posted by cl...@apache.org.
ARTEMIS-1393 CriticalAnalyzer timeout uses System::currentTimeMillis
The timeout logic is changed to use System::nanoTime, less sensible to OS clock changes.
The volatile set on CriticalMeasure are changed with cheaper lazySet.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9cf222be
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9cf222be
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9cf222be
Branch: refs/heads/master
Commit: 9cf222be116192a8f87eb9e55fc5d0246dc5cf3f
Parents: 99b2e4c
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Sep 6 18:08:42 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 7 10:11:35 2017 -0400
----------------------------------------------------------------------
.../utils/critical/CriticalAnalyzer.java | 10 +++---
.../utils/critical/CriticalAnalyzerImpl.java | 32 ++++++++++----------
.../utils/critical/CriticalComponent.java | 2 +-
.../artemis/utils/critical/CriticalMeasure.java | 30 ++++++++++++------
.../utils/critical/EmptyCriticalAnalyzer.java | 14 +++++----
.../utils/critical/CriticalAnalyzerTest.java | 29 +++++++++++++++---
.../core/server/impl/ActiveMQServerImpl.java | 24 +++++++--------
7 files changed, 89 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
index 6b5a436..844f9f0 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.utils.critical;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
public interface CriticalAnalyzer extends ActiveMQComponent {
@@ -29,13 +31,13 @@ public interface CriticalAnalyzer extends ActiveMQComponent {
void remove(CriticalComponent component);
- CriticalAnalyzer setCheckTime(long timeout);
+ CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit);
- long getCheckTime();
+ long getCheckTimeNanoSeconds();
- CriticalAnalyzer setTimeout(long timeout);
+ CriticalAnalyzer setTimeout(long timeout, TimeUnit unit);
- long getTimeout();
+ long getTimeout(TimeUnit unit);
CriticalAnalyzer addAction(CriticalAction action);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
index c583f2a..ef31fe8 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
@@ -29,9 +29,9 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
private final Logger logger = Logger.getLogger(CriticalAnalyzer.class);
- private volatile long timeout;
+ private volatile long timeoutNanoSeconds;
- private volatile long checkTime;
+ private volatile long checkTimeNanoSeconds;
@Override
public void clear() {
@@ -63,31 +63,31 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
}
@Override
- public CriticalAnalyzer setCheckTime(long timeout) {
- this.checkTime = timeout;
+ public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
+ this.checkTimeNanoSeconds = timeout;
return this;
}
@Override
- public long getCheckTime() {
- if (checkTime == 0) {
- checkTime = getTimeout() / 2;
+ public long getCheckTimeNanoSeconds() {
+ if (checkTimeNanoSeconds == 0) {
+ checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2;
}
- return checkTime;
+ return checkTimeNanoSeconds;
}
@Override
- public CriticalAnalyzer setTimeout(long timeout) {
- this.timeout = timeout;
+ public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
+ this.timeoutNanoSeconds = unit.toNanos(timeout);
return this;
}
@Override
- public long getTimeout() {
- if (timeout == 0) {
- timeout = TimeUnit.MINUTES.toMillis(2);
+ public long getTimeout(TimeUnit unit) {
+ if (timeoutNanoSeconds == 0) {
+ timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2);
}
- return timeout;
+ return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS);
}
@Override
@@ -103,7 +103,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
try {
for (CriticalComponent component : components) {
- if (component.isExpired(timeout)) {
+ if (component.isExpired(timeoutNanoSeconds)) {
fireAction(component);
// no need to keep running if there's already a component failed
return;
@@ -142,7 +142,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
public void run() {
try {
while (true) {
- if (running.tryAcquire(getCheckTime(), TimeUnit.MILLISECONDS)) {
+ if (running.tryAcquire(getCheckTimeNanoSeconds(), TimeUnit.NANOSECONDS)) {
running.release();
// this means that the server has been stopped as we could acquire the semaphore... returning now
break;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
index a2459dd..367e9c5 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
@@ -20,7 +20,7 @@ package org.apache.activemq.artemis.utils.critical;
/**
* A Critical component enters and leaves a critical state.
* You update a long every time you enter a critical path
- * you update a different long with a System.currentMillis every time you leave that path.
+ * you update a different long with a System.nanoTime every time you leave that path.
*
* If the enterCritical > leaveCritical at any point, then you need to measure the timeout.
* if the system stops responding, then you have something irresponsive at the system.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
index b853dc5..5b78a4a 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
@@ -17,36 +17,48 @@
package org.apache.activemq.artemis.utils.critical;
-import org.jboss.logging.Logger;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class CriticalMeasure {
- private static final Logger logger = Logger.getLogger(CriticalMeasure.class);
+ //uses updaters to avoid creates many AtomicLong instances
+ private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_ENTER_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeEnter");
+ private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_LEFT_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeLeft");
private volatile long timeEnter;
private volatile long timeLeft;
+ public CriticalMeasure() {
+ //prefer this approach instead of using some fixed value because System::nanoTime could change sign
+ //with long running processes
+ enterCritical();
+ leaveCritical();
+ }
+
public void enterCritical() {
- timeEnter = System.currentTimeMillis();
+ //prefer lazySet in order to avoid heavy-weight full barriers on x86
+ TIME_ENTER_UPDATER.lazySet(this, System.nanoTime());
}
public void leaveCritical() {
- timeLeft = System.currentTimeMillis();
+ TIME_LEFT_UPDATER.lazySet(this, System.nanoTime());
}
public boolean isExpired(long timeout) {
- if (timeEnter > timeLeft) {
- return System.currentTimeMillis() - timeEnter > timeout;
+ final long timeLeft = TIME_LEFT_UPDATER.get(this);
+ final long timeEnter = TIME_ENTER_UPDATER.get(this);
+ //due to how System::nanoTime works is better to use differences to prevent numerical overflow while comparing
+ if (timeLeft - timeEnter < 0) {
+ return System.nanoTime() - timeEnter > timeout;
}
-
return false;
}
public long enterTime() {
- return timeEnter;
+ return TIME_ENTER_UPDATER.get(this);
}
public long leaveTime() {
- return timeLeft;
+ return TIME_LEFT_UPDATER.get(this);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
index 4cf23a9..a5064ce 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.utils.critical;
+import java.util.concurrent.TimeUnit;
+
public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer();
@@ -59,22 +61,22 @@ public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
}
@Override
- public CriticalAnalyzer setCheckTime(long timeout) {
- return this;
+ public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
+ return null;
}
@Override
- public long getCheckTime() {
+ public long getCheckTimeNanoSeconds() {
return 0;
}
@Override
- public CriticalAnalyzer setTimeout(long timeout) {
- return this;
+ public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
+ return null;
}
@Override
- public long getTimeout() {
+ public long getTimeout(TimeUnit unit) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
index 638eb61..d8ebaf3 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
@@ -42,7 +42,7 @@ public class CriticalAnalyzerTest {
@Test
public void testAction() throws Exception {
- analyzer = new CriticalAnalyzerImpl().setTimeout(100).setCheckTime(50);
+ analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS);
analyzer.add(new CriticalComponent() {
@Override
public boolean isExpired(long timeout) {
@@ -66,7 +66,7 @@ public class CriticalAnalyzerTest {
@Test
public void testActionOnImpl() throws Exception {
- analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
+ analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
analyzer.add(component);
@@ -89,8 +89,29 @@ public class CriticalAnalyzerTest {
}
@Test
+ public void testEnterNoLeaveNoExpire() throws Exception {
+ analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
+ CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
+ component.enterCritical(0);
+ Assert.assertFalse(component.isExpired(TimeUnit.MINUTES.toNanos(1)));
+ analyzer.stop();
+
+ }
+
+ @Test
+ public void testEnterNoLeaveExpire() throws Exception {
+ analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
+ CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
+ component.enterCritical(0);
+ Thread.sleep(50);
+ Assert.assertTrue(component.isExpired(0));
+ analyzer.stop();
+
+ }
+
+ @Test
public void testNegative() throws Exception {
- analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
+ analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
analyzer.add(component);
@@ -111,4 +132,4 @@ public class CriticalAnalyzerTest {
analyzer.stop();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 15aca09..68e5559 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -506,7 +506,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
this.analyzer.clear();
- this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod()).setTimeout(configuration.getCriticalAnalyzerTimeout());
+ this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
if (configuration.isCriticalAnalyzer()) {
this.getCriticalAnalyzer().start();
@@ -1437,7 +1437,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(validatedUser);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
- autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
+ autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
@@ -1838,7 +1838,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
- removeConsumers, autoDeleteAddress) : null);
+ removeConsumers, autoDeleteAddress) : null);
addressSettingsRepository.clearCache();
@@ -1882,7 +1882,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callPostQueueDeletionCallbacks(address, queueName);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
- removeConsumers, autoDeleteAddress) : null);
+ removeConsumers, autoDeleteAddress) : null);
}
@Override
@@ -2456,13 +2456,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
- .map(CoreAddressConfiguration::getName)
- .collect(Collectors.toSet());
+ .map(CoreAddressConfiguration::getName)
+ .collect(Collectors.toSet());
Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
- .map(CoreAddressConfiguration::getQueueConfigurations)
- .flatMap(List::stream).map(CoreQueueConfiguration::getName)
- .collect(Collectors.toSet());
+ .map(CoreAddressConfiguration::getQueueConfigurations)
+ .flatMap(List::stream).map(CoreQueueConfiguration::getName)
+ .collect(Collectors.toSet());
for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
@@ -2521,8 +2521,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Queue queue = updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers());
if (queue == null) {
queue = createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
- queueName, SimpleString.toSimpleString(config.getFilterString()), null,
- config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
+ queueName, SimpleString.toSimpleString(config.getFilterString()), null,
+ config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
}
return queue;
}
@@ -2990,4 +2990,4 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
-}
+}
\ No newline at end of file