You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:47 UTC
[sling-org-apache-sling-distribution-journal] 02/07: SLING-9340 -
stop processQueue using the running flag
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit a8c8aeff22cd9af1d5ba4e46109484f63a366f8b
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:36:51 2020 +0200
SLING-9340 - stop processQueue using the running flag
---
.../impl/subscriber/DistributionSubscriber.java | 42 +++++++++++++---------
1 file changed, 26 insertions(+), 16 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8228424..fb4fbcd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
@@ -109,6 +110,8 @@ public class DistributionSubscriber implements DistributionAgent {
private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
+ private static final DistributionQueueItem STOPPED_ITEM = new DistributionQueueItem("stop-item", emptyMap());
+
@Reference(name = "packageBuilder")
private DistributionPackageBuilder packageBuilder;
@@ -353,21 +356,23 @@ public class DistributionSubscriber implements DistributionAgent {
private void processQueue() {
LOG.info("Started Queue processor");
- while (!Thread.interrupted()) {
- try {
- fetchAndProcessQueueItem();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ while (running) {
+ fetchAndProcessQueueItem();
}
LOG.info("Stopped Queue processor");
}
- private void fetchAndProcessQueueItem() throws InterruptedException {
+ private void fetchAndProcessQueueItem() {
try {
- boolean sent = blockingSendStoredStatus();
+ if (! blockingSendStoredStatus()) {
+ return;
+ }
+
DistributionQueueItem item = blockingPeekQueueItem();
+ if (STOPPED_ITEM == item) {
+ return;
+ }
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
processQueueItem(item);
@@ -376,17 +381,13 @@ public class DistributionSubscriber implements DistributionAgent {
}
} catch (TimeoutException e) {
- /**
- * Precondition timed out. We only log this on info level as it is no error
- */
+ // Precondition timed out. We only log this on info level as it is no error
LOG.info(e.getMessage());
- Thread.sleep(RETRY_DELAY);
- } catch (InterruptedException e) {
- throw e;
+ delay(RETRY_DELAY);
} catch (Exception e) {
// Catch all to prevent processing from stopping
LOG.error("Error processing queue item", e);
- Thread.sleep(RETRY_DELAY);
+ delay(RETRY_DELAY);
}
}
@@ -406,7 +407,7 @@ public class DistributionSubscriber implements DistributionAgent {
}
private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
- while (true) {
+ while (running) {
DistributionQueueItem queueItem = queueItemsBuffer.peek();
if (queueItem != null) {
return queueItem;
@@ -414,6 +415,7 @@ public class DistributionSubscriber implements DistributionAgent {
Thread.sleep(QUEUE_FETCH_DELAY);
}
}
+ return STOPPED_ITEM;
}
private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
@@ -435,4 +437,12 @@ public class DistributionSubscriber implements DistributionAgent {
return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
}
+ private static void delay(long delayInMs) {
+ try {
+ Thread.sleep(delayInMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
}