You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:45 UTC

[sling-org-apache-sling-distribution-journal] branch master updated (03d8900 -> fe471d6)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


    from 03d8900  Revert "SLING-9259 - Extract service subscriber code into separate package (#25)"
     new d53f87e  SLING-9340 - keep blocking code in the DistributionSubscriber class
     new a8c8aef  SLING-9340 - stop processQueue using the running flag
     new b92195d  SLING-9340 - Precondition to raise ISE instead of IE
     new 3078830  SLING-9340 - Don't interrupt the queue processor thread
     new c1d3925  SLING-9340 - Don't raise IE upon enqueue
     new 33d609d  SLING-9340 - Don't interrupt seeder thread
     new fe471d6  SLING-9340 - Add a comments highlighting why we don't use interrupt

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../journal/impl/precondition/Precondition.java    |  4 +-
 .../impl/precondition/StagingPrecondition.java     | 34 ++++++---
 .../journal/impl/queue/impl/PubQueueCache.java     | 41 ++++++-----
 .../journal/impl/subscriber/BookKeeper.java        | 30 ++++----
 .../impl/subscriber/DistributionSubscriber.java    | 81 +++++++++++++++-------
 .../impl/precondition/StagingPreconditionTest.java |  3 +-
 6 files changed, 122 insertions(+), 71 deletions(-)


[sling-org-apache-sling-distribution-journal] 05/07: SLING-9340 - Don't raise IE upon enqueue

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit c1d39257cf01fa96e2d1e9e66011472c23326fdc
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:07:38 2020 +0200

    SLING-9340 - Don't raise IE upon enqueue
---
 .../distribution/journal/impl/subscriber/DistributionSubscriber.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 3e601ac..8ca48ff 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -341,11 +341,10 @@ public class DistributionSubscriber implements DistributionAgent {
                     return;
                 }
             }
-            throw new InterruptedException();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            throw new RuntimeException();
         }
+        throw new RuntimeException();
     }
 
     private void processQueue() {


[sling-org-apache-sling-distribution-journal] 01/07: SLING-9340 - keep blocking code in the DistributionSubscriber class

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit d53f87e92859ea7ee8ac7787a886ea46ab20524e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:19:59 2020 +0200

    SLING-9340 - keep blocking code in the DistributionSubscriber class
---
 .../journal/impl/subscriber/BookKeeper.java        | 30 +++++++++++-----------
 .../impl/subscriber/DistributionSubscriber.java    | 18 ++++++++++++-
 2 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 53a7245..1c12690 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -246,30 +246,22 @@ public class BookKeeper implements Closeable {
     }
 
     /**
-     * Send status stored in a previous run if exists
-     * @throws InterruptedException
+     * @return {@code true} if the status has been sent ;
+     *         {@code false} otherwise.
      */
-    public void sendStoredStatus() throws InterruptedException {
-        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
-            PackageStatus status = new PackageStatus(statusStore.load());
-            boolean sent = status.sent;
-            int retry = 0;
-            while (!sent) {
-                sent = sendStoredStatusOnce(status, retry++);
-            }
-        } catch (IOException e) {
-            log.warn("Error in timer close", e);
-        }
+    public boolean sendStoredStatus(int retry) {
+        PackageStatus status = new PackageStatus(statusStore.load());
+        return status.sent || sendStoredStatus(status, retry);
     }
 
-    private boolean sendStoredStatusOnce(PackageStatus status, int retry) throws InterruptedException {
+    private boolean sendStoredStatus(PackageStatus status, int retry) {
         try {
             sendStatusMessage(status);
             markStatusSent();
             return true;
         } catch (Exception e) {
             log.warn("Cannot send status (retry {})", retry, e);
-            Thread.sleep(RETRY_SEND_DELAY);
+            retryDelay();
             return false;
         }
     }
@@ -340,6 +332,14 @@ public class BookKeeper implements Closeable {
         return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
     }
 
+    static void retryDelay() {
+        try {
+            Thread.sleep(RETRY_SEND_DELAY);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     class PackageStatus {
         final Status status;
         final Long offset;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..8228424 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -29,6 +29,7 @@ import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.
 import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -365,7 +366,7 @@ public class DistributionSubscriber implements DistributionAgent {
     private void fetchAndProcessQueueItem() throws InterruptedException {
         try {
             
-            bookKeeper.sendStoredStatus();
+            boolean sent = blockingSendStoredStatus();
             DistributionQueueItem item = blockingPeekQueueItem();
 
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -389,6 +390,21 @@ public class DistributionSubscriber implements DistributionAgent {
         }
     }
 
+    /**
+     * Send status stored in a previous run if exists
+     *
+     * @return {@code true} if the status has been sent ;
+     *         {@code false} otherwise.
+     */
+    private boolean blockingSendStoredStatus() {
+        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+            for (int retry = 0 ; running && ! bookKeeper.sendStoredStatus(retry) ; retry++);
+        } catch (IOException e) {
+            LOG.warn("Error in timer close", e);
+        }
+        return running;
+    }
+
     private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
         while (true) {
             DistributionQueueItem queueItem = queueItemsBuffer.peek();


[sling-org-apache-sling-distribution-journal] 07/07: SLING-9340 - Add a comments highlighting why we don't use interrupt

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit fe471d6d2158d70b74197499cf6a66132f3ab5e7
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:34:39 2020 +0200

    SLING-9340 - Add a comments highlighting why we don't use interrupt
---
 .../journal/impl/precondition/StagingPrecondition.java            | 8 ++++++++
 .../sling/distribution/journal/impl/queue/impl/PubQueueCache.java | 8 ++++++++
 .../journal/impl/subscriber/DistributionSubscriber.java           | 8 ++++++++
 3 files changed, 24 insertions(+)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index c0e3d48..d523cdf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -68,6 +68,14 @@ public class StagingPrecondition implements Precondition, Runnable {
 
     @Deactivate
     public synchronized void deactivate() {
+
+        /*
+         * Note that we don't interrupt blocking calls using Thread.interrupt()
+         * because interrupts can stop the Apache Oak repository.
+         *
+         * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+         */
+
         IOUtils.closeQuietly(watcher);
         running = false;
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 84e28c3..1b9e14d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -150,6 +150,14 @@ public class PubQueueCache {
     }
 
     public void close() {
+
+        /*
+         * Note that we don't close resources using Thread.interrupt()
+         * because interrupts can stop the Apache Oak repository.
+         *
+         * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+         */
+
         closed = true;
         IOUtils.closeQuietly(tailPoller);
         jmxRegs.stream().forEach(IOUtils::closeQuietly);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8ca48ff..663dce7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -243,6 +243,14 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Deactivate
     public void deactivate() {
+
+        /*
+         * Note that we don't interrupt blocking calls using Thread.interrupt()
+         * because interrupts can stop the Apache Oak repository.
+         *
+         * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+         */
+
         componentReg.unregister();
         IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper, 
                 packagePoller, commandPoller);


[sling-org-apache-sling-distribution-journal] 02/07: SLING-9340 - stop processQueue using the running flag

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit a8c8aeff22cd9af1d5ba4e46109484f63a366f8b
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:36:51 2020 +0200

    SLING-9340 - stop processQueue using the running flag
---
 .../impl/subscriber/DistributionSubscriber.java    | 42 +++++++++++++---------
 1 file changed, 26 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8228424..fb4fbcd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
@@ -109,6 +110,8 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
 
+    private static final DistributionQueueItem STOPPED_ITEM = new DistributionQueueItem("stop-item", emptyMap());
+
     @Reference(name = "packageBuilder")
     private DistributionPackageBuilder packageBuilder;
 
@@ -353,21 +356,23 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private void processQueue() {
         LOG.info("Started Queue processor");
-        while (!Thread.interrupted()) {
-            try {
-                fetchAndProcessQueueItem();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
+        while (running) {
+            fetchAndProcessQueueItem();
         }
         LOG.info("Stopped Queue processor");
     }
 
-    private void fetchAndProcessQueueItem() throws InterruptedException {
+    private void fetchAndProcessQueueItem() {
         try {
             
-            boolean sent = blockingSendStoredStatus();
+            if (! blockingSendStoredStatus()) {
+                return;
+            }
+
             DistributionQueueItem item = blockingPeekQueueItem();
+            if (STOPPED_ITEM == item) {
+                return;
+            }
 
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
                 processQueueItem(item);
@@ -376,17 +381,13 @@ public class DistributionSubscriber implements DistributionAgent {
             }
 
         } catch (TimeoutException e) {
-            /**
-             * Precondition timed out. We only log this on info level as it is no error
-             */
+            // Precondition timed out. We only log this on info level as it is no error
             LOG.info(e.getMessage());
-            Thread.sleep(RETRY_DELAY);
-        } catch (InterruptedException e) {
-            throw e;
+            delay(RETRY_DELAY);
         } catch (Exception e) {
             // Catch all to prevent processing from stopping
             LOG.error("Error processing queue item", e);
-            Thread.sleep(RETRY_DELAY);
+            delay(RETRY_DELAY);
         }
     }
 
@@ -406,7 +407,7 @@ public class DistributionSubscriber implements DistributionAgent {
     }
 
     private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
-        while (true) {
+        while (running) {
             DistributionQueueItem queueItem = queueItemsBuffer.peek();
             if (queueItem != null) {
                 return queueItem;
@@ -414,6 +415,7 @@ public class DistributionSubscriber implements DistributionAgent {
                 Thread.sleep(QUEUE_FETCH_DELAY);
             }
         }
+        return STOPPED_ITEM;
     }
 
     private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
@@ -435,4 +437,12 @@ public class DistributionSubscriber implements DistributionAgent {
         return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
     }
 
+    private static void delay(long delayInMs) {
+        try {
+            Thread.sleep(delayInMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
 }


[sling-org-apache-sling-distribution-journal] 06/07: SLING-9340 - Don't interrupt seeder thread

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 33d609d0a321e5e7e4f9a6758413437d4d4291ea
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:24:23 2020 +0200

    SLING-9340 - Don't interrupt seeder thread
---
 .../journal/impl/queue/impl/PubQueueCache.java     | 33 ++++++++++------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 189513c..84e28c3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,8 +124,6 @@ public class PubQueueCache {
      */
     private volatile boolean closed;
 
-    private final Thread seeder;
-
     public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, long seedingDelayMs) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
@@ -138,7 +136,7 @@ public class PubQueueCache {
                 Reset.latest,
                 create(PackageMessage.class, this::handlePackage));
 
-        seeder = RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
+        RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
     }
 
     @Nonnull
@@ -153,7 +151,6 @@ public class PubQueueCache {
 
     public void close() {
         closed = true;
-        seeder.interrupt();
         IOUtils.closeQuietly(tailPoller);
         jmxRegs.stream().forEach(IOUtils::closeQuietly);
     }
@@ -162,27 +159,28 @@ public class PubQueueCache {
         LOG.info("Start message seeder");
         try {
             MessageSender<PackageMessage> sender = messagingProvider.createSender();
-            sendSeedingMessages(sender);
+            do {
+                sendSeedingMessage(sender);
+            } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         } finally {
             LOG.info("Stop message seeder");
         }
     }
 
-    private void sendSeedingMessages(MessageSender<PackageMessage> sender) {
-        while (! Thread.interrupted()) {
-            PackageMessage pkgMsg = createTestMessage();
-            LOG.info("Send seeding message");
-            try {
-                sender.send(topic, pkgMsg);
-                sleep(seedingDelayMs);
-            } catch (MessagingException e) {
-                LOG.warn(e.getMessage(), e);
-                sleep(seedingDelayMs * 10);
-            }
+    private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
+        PackageMessage pkgMsg = createTestMessage();
+        LOG.info("Send seeding message");
+        try {
+            sender.send(topic, pkgMsg);
+        } catch (MessagingException e) {
+            LOG.warn(e.getMessage(), e);
+            delay(seedingDelayMs * 10);
         }
     }
 
-    private void sleep(long sleepMs) {
+    private static void delay(long sleepMs) {
         try {
             Thread.sleep(sleepMs);
         } catch (InterruptedException e) {
@@ -338,6 +336,5 @@ public class PubQueueCache {
             LOG.info("Cache has been seeded");
         }
         seeded.countDown();
-        seeder.interrupt();
     }
 }


[sling-org-apache-sling-distribution-journal] 04/07: SLING-9340 - Don't interrupt the queue processor thread

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 307883048dad3b6ba798917e622271b8b31e19f4
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:57:00 2020 +0200

    SLING-9340 - Don't interrupt the queue processor thread
---
 .../journal/impl/subscriber/DistributionSubscriber.java           | 8 +-------
 1 file changed, 1 insertion(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 93f528b..3e601ac 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -166,8 +166,6 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private volatile boolean running = true;
 
-    private volatile Thread queueProcessor;
-
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
         String subSlingId = requireNonNull(slingSettings.getSlingId());
@@ -203,7 +201,7 @@ public class DistributionSubscriber implements DistributionAgent {
 
         commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, editable);
 
-        queueProcessor = startBackgroundThread(this::processQueue,
+        startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", subAgentName));
 
         int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
@@ -249,10 +247,6 @@ public class DistributionSubscriber implements DistributionAgent {
         IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper, 
                 packagePoller, commandPoller);
         running = false;
-        Thread interrupter = this.queueProcessor;
-        if (interrupter != null) {
-            interrupter.interrupt();
-        }
         String msg = String.format(
                 "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
                 subAgentName, queueNames, pkgType);


[sling-org-apache-sling-distribution-journal] 03/07: SLING-9340 - Precondition to raise ISE instead of IE

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit b92195d39a0019dfd65e7abbc542b518b90e871e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:54:45 2020 +0200

    SLING-9340 - Precondition to raise ISE instead of IE
---
 .../journal/impl/precondition/Precondition.java    |  4 ++--
 .../impl/precondition/StagingPrecondition.java     | 26 +++++++++++++++-------
 .../impl/subscriber/DistributionSubscriber.java    |  6 +++--
 .../impl/precondition/StagingPreconditionTest.java |  3 +--
 4 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 3730475..69da7b4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -30,9 +30,9 @@ public interface Precondition {
      * @param pkgOffset the offset of the package
      * @param timeoutSeconds max seconds to wait until returning
      * @throws TimeoutException if the timeout expired without being able to determine status
-     * @throws InterruptedException if the thread was interrupted and should shut down
+     * @throws IllegalStateException if the precondition can't be evaluated
      * @return true if the package can be processed; otherwise it returns false.
      */
-    boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
+    boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException;
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..c0e3d48 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -48,6 +48,8 @@ public class StagingPrecondition implements Precondition, Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class);
 
+    private static final long STATUS_CHECK_DELAY_MS = 100;
+
     @Reference
     private MessagingProvider messagingProvider;
 
@@ -71,27 +73,27 @@ public class StagingPrecondition implements Precondition, Runnable {
     }
 
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
+    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException {
         if (timeoutSeconds < 1) {
             throw new IllegalArgumentException();
         }
 
         // try to get the status for timeoutSeconds and then throw
-        for(int i=0; i < timeoutSeconds * 10; i++) {
+        for(int i = 0; running && i < timeoutSeconds * 10 ; i++) {
             Status status = getStatus(subAgentName, pkgOffset);
-
             if (status != null) {
                 return status == Status.IMPORTED;
             } else {
-                Thread.sleep(100);
-            }
-            
-            if (!running) {
-                throw new InterruptedException("Staging precondition is shutting down");
+                delayStatusCheck();
             }
         }
 
+        if (!running) {
+            throw new IllegalStateException("Staging precondition is shutting down");
+        }
+
         throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
+
     }
 
     private synchronized Status getStatus(String subAgentName, long pkgOffset) {
@@ -104,4 +106,12 @@ public class StagingPrecondition implements Precondition, Runnable {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
     }
 
+    private static void delayStatusCheck() {
+        try {
+            Thread.sleep(STATUS_CHECK_DELAY_MS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fb4fbcd..93f528b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -384,6 +384,8 @@ public class DistributionSubscriber implements DistributionAgent {
             // Precondition timed out. We only log this on info level as it is no error
             LOG.info(e.getMessage());
             delay(RETRY_DELAY);
+        } catch (IllegalStateException e) {
+            throw e;
         } catch (Exception e) {
             // Catch all to prevent processing from stopping
             LOG.error("Error processing queue item", e);
@@ -418,7 +420,7 @@ public class DistributionSubscriber implements DistributionAgent {
         return STOPPED_ITEM;
     }
 
-    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
+    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException {
         long offset = queueItem.get(RECORD_OFFSET, Long.class);
         PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
         boolean skip = shouldSkip(offset);
@@ -433,7 +435,7 @@ public class DistributionSubscriber implements DistributionAgent {
         distributionMetricsService.getItemsBufferSize().decrement();
     }
 
-    private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
+    private boolean shouldSkip(long offset) throws TimeoutException {
         return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..6a9f893 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -35,7 +35,6 @@ import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
@@ -117,7 +116,7 @@ public class StagingPreconditionTest {
         th.start();
         precondition.deactivate();
         Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
-        assertThat(ex, instanceOf(InterruptedException.class));
+        assertThat(ex, instanceOf(IllegalStateException.class));
     }
     
     @Test(expected = TimeoutException.class)