You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:47 UTC

[sling-org-apache-sling-distribution-journal] 02/07: SLING-9340 - stop processQueue using the running flag

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit a8c8aeff22cd9af1d5ba4e46109484f63a366f8b
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:36:51 2020 +0200

    SLING-9340 - stop processQueue using the running flag
---
 .../impl/subscriber/DistributionSubscriber.java    | 42 +++++++++++++---------
 1 file changed, 26 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8228424..fb4fbcd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
@@ -109,6 +110,8 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
 
+    private static final DistributionQueueItem STOPPED_ITEM = new DistributionQueueItem("stop-item", emptyMap());
+
     @Reference(name = "packageBuilder")
     private DistributionPackageBuilder packageBuilder;
 
@@ -353,21 +356,23 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private void processQueue() {
         LOG.info("Started Queue processor");
-        while (!Thread.interrupted()) {
-            try {
-                fetchAndProcessQueueItem();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
+        while (running) {
+            fetchAndProcessQueueItem();
         }
         LOG.info("Stopped Queue processor");
     }
 
-    private void fetchAndProcessQueueItem() throws InterruptedException {
+    private void fetchAndProcessQueueItem() {
         try {
             
-            boolean sent = blockingSendStoredStatus();
+            if (! blockingSendStoredStatus()) {
+                return;
+            }
+
             DistributionQueueItem item = blockingPeekQueueItem();
+            if (STOPPED_ITEM == item) {
+                return;
+            }
 
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
                 processQueueItem(item);
@@ -376,17 +381,13 @@ public class DistributionSubscriber implements DistributionAgent {
             }
 
         } catch (TimeoutException e) {
-            /**
-             * Precondition timed out. We only log this on info level as it is no error
-             */
+            // Precondition timed out. We only log this on info level as it is no error
             LOG.info(e.getMessage());
-            Thread.sleep(RETRY_DELAY);
-        } catch (InterruptedException e) {
-            throw e;
+            delay(RETRY_DELAY);
         } catch (Exception e) {
             // Catch all to prevent processing from stopping
             LOG.error("Error processing queue item", e);
-            Thread.sleep(RETRY_DELAY);
+            delay(RETRY_DELAY);
         }
     }
 
@@ -406,7 +407,7 @@ public class DistributionSubscriber implements DistributionAgent {
     }
 
     private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
-        while (true) {
+        while (running) {
             DistributionQueueItem queueItem = queueItemsBuffer.peek();
             if (queueItem != null) {
                 return queueItem;
@@ -414,6 +415,7 @@ public class DistributionSubscriber implements DistributionAgent {
                 Thread.sleep(QUEUE_FETCH_DELAY);
             }
         }
+        return STOPPED_ITEM;
     }
 
     private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
@@ -435,4 +437,12 @@ public class DistributionSubscriber implements DistributionAgent {
         return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
     }
 
+    private static void delay(long delayInMs) {
+        try {
+            Thread.sleep(delayInMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
 }