You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/15 15:03:15 UTC

[2/5] activemq-artemis git commit: ARTEMIS-734 small improvement: use ActiveMQScheduledComponent

ARTEMIS-734 small improvement: use ActiveMQScheduledComponent


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6b5fff40
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6b5fff40
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6b5fff40

Branch: refs/heads/master
Commit: 6b5fff40cb105053db7de043b9f6927d29ecc7b7
Parents: 2509eeb
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 14 18:51:13 2016 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Sep 15 16:02:49 2016 +0100

----------------------------------------------------------------------
 .../core/postoffice/impl/PostOfficeImpl.java    | 70 +++++++-------------
 1 file changed, 25 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6b5fff40/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 61b46a0..c8a6966 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -27,13 +27,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueInfo;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -112,8 +113,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
    private Reaper reaperRunnable;
 
-   private volatile Thread reaperThread;
-
    private final long reaperPeriod;
 
    private final int reaperPriority;
@@ -198,12 +197,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       if (reaperRunnable != null)
          reaperRunnable.stop();
 
-      if (reaperThread != null) {
-         reaperThread.join();
-
-         reaperThread = null;
-      }
-
       addressManager.clear();
 
       queueInfos.clear();
@@ -1244,12 +1237,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       if (reaperPeriod > 0) {
          if (reaperRunnable != null)
             reaperRunnable.stop();
-         reaperRunnable = new Reaper();
-         reaperThread = new Thread(reaperRunnable, "activemq-expiry-reaper-thread");
-
-         reaperThread.setPriority(reaperPriority);
+         reaperRunnable = new Reaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), reaperPeriod, TimeUnit.MILLISECONDS, false);
 
-         reaperThread.start();
+         reaperRunnable.start();
       }
    }
 
@@ -1268,48 +1258,38 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       return message;
    }
 
-   private final class Reaper implements Runnable {
+   private final class Reaper extends ActiveMQScheduledComponent {
 
-      private final CountDownLatch latch = new CountDownLatch(1);
-
-      public void stop() {
-         latch.countDown();
+      Reaper(ScheduledExecutorService scheduledExecutorService,
+                    Executor executor,
+                    long checkPeriod,
+                    TimeUnit timeUnit,
+                    boolean onDemand) {
+         super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
       }
 
       @Override
       public void run() {
          // The reaper thread should be finished case the PostOffice is gone
          // This is to avoid leaks on PostOffice between stops and starts
-         while (isStarted()) {
-            try {
-               if (latch.await(reaperPeriod, TimeUnit.MILLISECONDS))
-                  return;
-            }
-            catch (InterruptedException e1) {
-               throw new ActiveMQInterruptedException(e1);
-            }
-            if (!isStarted())
-               return;
-
-            Map<SimpleString, Binding> nameMap = addressManager.getBindings();
+         Map<SimpleString, Binding> nameMap = addressManager.getBindings();
 
-            List<Queue> queues = new ArrayList<>();
+         List<Queue> queues = new ArrayList<>();
 
-            for (Binding binding : nameMap.values()) {
-               if (binding.getType() == BindingType.LOCAL_QUEUE) {
-                  Queue queue = (Queue) binding.getBindable();
+         for (Binding binding : nameMap.values()) {
+            if (binding.getType() == BindingType.LOCAL_QUEUE) {
+               Queue queue = (Queue) binding.getBindable();
 
-                  queues.add(queue);
-               }
+               queues.add(queue);
             }
+         }
 
-            for (Queue queue : queues) {
-               try {
-                  queue.expireReferences();
-               }
-               catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
-               }
+         for (Queue queue : queues) {
+            try {
+               queue.expireReferences();
+            }
+            catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
             }
          }
       }