You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/07 14:11:41 UTC

[1/2] activemq-artemis git commit: This closes #1515

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 99b2e4c0f -> 65a0c6104


This closes #1515


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/65a0c610
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/65a0c610
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/65a0c610

Branch: refs/heads/master
Commit: 65a0c6104aeea5b946bfcf2d90515fc4257c6c87
Parents: 99b2e4c 9cf222b
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 7 10:11:35 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 7 10:11:35 2017 -0400

----------------------------------------------------------------------
 .../utils/critical/CriticalAnalyzer.java        | 10 +++---
 .../utils/critical/CriticalAnalyzerImpl.java    | 32 ++++++++++----------
 .../utils/critical/CriticalComponent.java       |  2 +-
 .../artemis/utils/critical/CriticalMeasure.java | 30 ++++++++++++------
 .../utils/critical/EmptyCriticalAnalyzer.java   | 14 +++++----
 .../utils/critical/CriticalAnalyzerTest.java    | 29 +++++++++++++++---
 .../core/server/impl/ActiveMQServerImpl.java    | 24 +++++++--------
 7 files changed, 89 insertions(+), 52 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-1393 CriticalAnalyzer timeout uses System::currentTimeMillis

Posted by cl...@apache.org.
ARTEMIS-1393 CriticalAnalyzer timeout uses System::currentTimeMillis

The timeout logic is changed to use System::nanoTime, less sensible to OS clock changes.
The volatile set on CriticalMeasure are changed with cheaper lazySet.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9cf222be
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9cf222be
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9cf222be

Branch: refs/heads/master
Commit: 9cf222be116192a8f87eb9e55fc5d0246dc5cf3f
Parents: 99b2e4c
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Sep 6 18:08:42 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 7 10:11:35 2017 -0400

----------------------------------------------------------------------
 .../utils/critical/CriticalAnalyzer.java        | 10 +++---
 .../utils/critical/CriticalAnalyzerImpl.java    | 32 ++++++++++----------
 .../utils/critical/CriticalComponent.java       |  2 +-
 .../artemis/utils/critical/CriticalMeasure.java | 30 ++++++++++++------
 .../utils/critical/EmptyCriticalAnalyzer.java   | 14 +++++----
 .../utils/critical/CriticalAnalyzerTest.java    | 29 +++++++++++++++---
 .../core/server/impl/ActiveMQServerImpl.java    | 24 +++++++--------
 7 files changed, 89 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
index 6b5a436..844f9f0 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.utils.critical;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 
 public interface CriticalAnalyzer extends ActiveMQComponent {
@@ -29,13 +31,13 @@ public interface CriticalAnalyzer extends ActiveMQComponent {
 
    void remove(CriticalComponent component);
 
-   CriticalAnalyzer setCheckTime(long timeout);
+   CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit);
 
-   long getCheckTime();
+   long getCheckTimeNanoSeconds();
 
-   CriticalAnalyzer setTimeout(long timeout);
+   CriticalAnalyzer setTimeout(long timeout, TimeUnit unit);
 
-   long getTimeout();
+   long getTimeout(TimeUnit unit);
 
    CriticalAnalyzer addAction(CriticalAction action);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
index c583f2a..ef31fe8 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
@@ -29,9 +29,9 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
 
    private final Logger logger = Logger.getLogger(CriticalAnalyzer.class);
 
-   private volatile long timeout;
+   private volatile long timeoutNanoSeconds;
 
-   private volatile long checkTime;
+   private volatile long checkTimeNanoSeconds;
 
    @Override
    public void clear() {
@@ -63,31 +63,31 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
    }
 
    @Override
-   public CriticalAnalyzer setCheckTime(long timeout) {
-      this.checkTime = timeout;
+   public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
+      this.checkTimeNanoSeconds = timeout;
       return this;
    }
 
    @Override
-   public long getCheckTime() {
-      if (checkTime == 0) {
-         checkTime = getTimeout() / 2;
+   public long getCheckTimeNanoSeconds() {
+      if (checkTimeNanoSeconds == 0) {
+         checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2;
       }
-      return checkTime;
+      return checkTimeNanoSeconds;
    }
 
    @Override
-   public CriticalAnalyzer setTimeout(long timeout) {
-      this.timeout = timeout;
+   public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
+      this.timeoutNanoSeconds = unit.toNanos(timeout);
       return this;
    }
 
    @Override
-   public long getTimeout() {
-      if (timeout == 0) {
-         timeout = TimeUnit.MINUTES.toMillis(2);
+   public long getTimeout(TimeUnit unit) {
+      if (timeoutNanoSeconds == 0) {
+         timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2);
       }
-      return timeout;
+      return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS);
    }
 
    @Override
@@ -103,7 +103,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
          try {
             for (CriticalComponent component : components) {
 
-               if (component.isExpired(timeout)) {
+               if (component.isExpired(timeoutNanoSeconds)) {
                   fireAction(component);
                   // no need to keep running if there's already a component failed
                   return;
@@ -142,7 +142,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
          public void run() {
             try {
                while (true) {
-                  if (running.tryAcquire(getCheckTime(), TimeUnit.MILLISECONDS)) {
+                  if (running.tryAcquire(getCheckTimeNanoSeconds(), TimeUnit.NANOSECONDS)) {
                      running.release();
                      // this means that the server has been stopped as we could acquire the semaphore... returning now
                      break;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
index a2459dd..367e9c5 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
@@ -20,7 +20,7 @@ package org.apache.activemq.artemis.utils.critical;
 /**
  * A Critical component enters and leaves a critical state.
  * You update a long every time you enter a critical path
- * you update a different long with a System.currentMillis every time you leave that path.
+ * you update a different long with a System.nanoTime every time you leave that path.
  *
  * If the enterCritical > leaveCritical at any point, then you need to measure the timeout.
  * if the system stops responding, then you have something irresponsive at the system.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
index b853dc5..5b78a4a 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
@@ -17,36 +17,48 @@
 
 package org.apache.activemq.artemis.utils.critical;
 
-import org.jboss.logging.Logger;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 public class CriticalMeasure {
 
-   private static final Logger logger = Logger.getLogger(CriticalMeasure.class);
+   //uses updaters to avoid creates many AtomicLong instances
+   private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_ENTER_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeEnter");
+   private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_LEFT_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeLeft");
 
    private volatile long timeEnter;
    private volatile long timeLeft;
 
+   public CriticalMeasure() {
+      //prefer this approach instead of using some fixed value because System::nanoTime could change sign
+      //with long running processes
+      enterCritical();
+      leaveCritical();
+   }
+
    public void enterCritical() {
-      timeEnter = System.currentTimeMillis();
+      //prefer lazySet in order to avoid heavy-weight full barriers on x86
+      TIME_ENTER_UPDATER.lazySet(this, System.nanoTime());
    }
 
    public void leaveCritical() {
-      timeLeft = System.currentTimeMillis();
+      TIME_LEFT_UPDATER.lazySet(this, System.nanoTime());
    }
 
    public boolean isExpired(long timeout) {
-      if (timeEnter > timeLeft) {
-         return System.currentTimeMillis() - timeEnter > timeout;
+      final long timeLeft = TIME_LEFT_UPDATER.get(this);
+      final long timeEnter = TIME_ENTER_UPDATER.get(this);
+      //due to how System::nanoTime works is better to use differences to prevent numerical overflow while comparing
+      if (timeLeft - timeEnter < 0) {
+         return System.nanoTime() - timeEnter > timeout;
       }
-
       return false;
    }
 
    public long enterTime() {
-      return timeEnter;
+      return TIME_ENTER_UPDATER.get(this);
    }
 
    public long leaveTime() {
-      return timeLeft;
+      return TIME_LEFT_UPDATER.get(this);
    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
index 4cf23a9..a5064ce 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.artemis.utils.critical;
 
+import java.util.concurrent.TimeUnit;
+
 public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
 
    private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer();
@@ -59,22 +61,22 @@ public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
    }
 
    @Override
-   public CriticalAnalyzer setCheckTime(long timeout) {
-      return this;
+   public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
+      return null;
    }
 
    @Override
-   public long getCheckTime() {
+   public long getCheckTimeNanoSeconds() {
       return 0;
    }
 
    @Override
-   public CriticalAnalyzer setTimeout(long timeout) {
-      return this;
+   public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
+      return null;
    }
 
    @Override
-   public long getTimeout() {
+   public long getTimeout(TimeUnit unit) {
       return 0;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
index 638eb61..d8ebaf3 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
@@ -42,7 +42,7 @@ public class CriticalAnalyzerTest {
 
    @Test
    public void testAction() throws Exception {
-      analyzer = new CriticalAnalyzerImpl().setTimeout(100).setCheckTime(50);
+      analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS);
       analyzer.add(new CriticalComponent() {
          @Override
          public boolean isExpired(long timeout) {
@@ -66,7 +66,7 @@ public class CriticalAnalyzerTest {
 
    @Test
    public void testActionOnImpl() throws Exception {
-      analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
+      analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
       CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
       analyzer.add(component);
 
@@ -89,8 +89,29 @@ public class CriticalAnalyzerTest {
    }
 
    @Test
+   public void testEnterNoLeaveNoExpire() throws Exception {
+      analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
+      CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
+      component.enterCritical(0);
+      Assert.assertFalse(component.isExpired(TimeUnit.MINUTES.toNanos(1)));
+      analyzer.stop();
+
+   }
+
+   @Test
+   public void testEnterNoLeaveExpire() throws Exception {
+      analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
+      CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
+      component.enterCritical(0);
+      Thread.sleep(50);
+      Assert.assertTrue(component.isExpired(0));
+      analyzer.stop();
+
+   }
+
+   @Test
    public void testNegative() throws Exception {
-      analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
+      analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
       CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
       analyzer.add(component);
 
@@ -111,4 +132,4 @@ public class CriticalAnalyzerTest {
       analyzer.stop();
    }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 15aca09..68e5559 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -506,7 +506,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       /** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
       this.analyzer.clear();
 
-      this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod()).setTimeout(configuration.getCriticalAnalyzerTimeout());
+      this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
 
       if (configuration.isCriticalAnalyzer()) {
          this.getCriticalAnalyzer().start();
@@ -1437,7 +1437,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       checkSessionLimit(validatedUser);
 
       callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
-            autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
+                                                                                  autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
 
       final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
 
@@ -1838,7 +1838,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
 
       callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
-            removeConsumers, autoDeleteAddress) : null);
+                                                                                 removeConsumers, autoDeleteAddress) : null);
 
       addressSettingsRepository.clearCache();
 
@@ -1882,7 +1882,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       callPostQueueDeletionCallbacks(address, queueName);
 
       callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
-            removeConsumers, autoDeleteAddress) : null);
+                                                                                removeConsumers, autoDeleteAddress) : null);
    }
 
    @Override
@@ -2456,13 +2456,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
       Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
-                                                   .map(CoreAddressConfiguration::getName)
-                                                   .collect(Collectors.toSet());
+         .map(CoreAddressConfiguration::getName)
+         .collect(Collectors.toSet());
 
       Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
-                                                .map(CoreAddressConfiguration::getQueueConfigurations)
-                                                .flatMap(List::stream).map(CoreQueueConfiguration::getName)
-                                                .collect(Collectors.toSet());
+         .map(CoreAddressConfiguration::getQueueConfigurations)
+         .flatMap(List::stream).map(CoreQueueConfiguration::getName)
+         .collect(Collectors.toSet());
 
       for (SimpleString addressName : listAddressNames()) {
          AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
@@ -2521,8 +2521,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       Queue queue = updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers());
       if (queue == null) {
          queue = createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
-            queueName, SimpleString.toSimpleString(config.getFilterString()), null,
-            config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
+                             queueName, SimpleString.toSimpleString(config.getFilterString()), null,
+                             config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
       }
       return queue;
    }
@@ -2990,4 +2990,4 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
    }
 
-}
+}
\ No newline at end of file