You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:48 UTC

[sling-org-apache-sling-distribution-journal] 03/07: SLING-9340 - Precondition to raise ISE instead of IE

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit b92195d39a0019dfd65e7abbc542b518b90e871e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:54:45 2020 +0200

    SLING-9340 - Precondition to raise ISE instead of IE
---
 .../journal/impl/precondition/Precondition.java    |  4 ++--
 .../impl/precondition/StagingPrecondition.java     | 26 +++++++++++++++-------
 .../impl/subscriber/DistributionSubscriber.java    |  6 +++--
 .../impl/precondition/StagingPreconditionTest.java |  3 +--
 4 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 3730475..69da7b4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -30,9 +30,9 @@ public interface Precondition {
      * @param pkgOffset the offset of the package
      * @param timeoutSeconds max seconds to wait until returning
      * @throws TimeoutException if the timeout expired without being able to determine status
-     * @throws InterruptedException if the thread was interrupted and should shut down
+     * @throws IllegalStateException if the precondition can't be evaluated
      * @return true if the package can be processed; otherwise it returns false.
      */
-    boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
+    boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException;
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..c0e3d48 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -48,6 +48,8 @@ public class StagingPrecondition implements Precondition, Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class);
 
+    private static final long STATUS_CHECK_DELAY_MS = 100;
+
     @Reference
     private MessagingProvider messagingProvider;
 
@@ -71,27 +73,27 @@ public class StagingPrecondition implements Precondition, Runnable {
     }
 
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
+    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException {
         if (timeoutSeconds < 1) {
             throw new IllegalArgumentException();
         }
 
         // try to get the status for timeoutSeconds and then throw
-        for(int i=0; i < timeoutSeconds * 10; i++) {
+        for(int i = 0; running && i < timeoutSeconds * 10 ; i++) {
             Status status = getStatus(subAgentName, pkgOffset);
-
             if (status != null) {
                 return status == Status.IMPORTED;
             } else {
-                Thread.sleep(100);
-            }
-            
-            if (!running) {
-                throw new InterruptedException("Staging precondition is shutting down");
+                delayStatusCheck();
             }
         }
 
+        if (!running) {
+            throw new IllegalStateException("Staging precondition is shutting down");
+        }
+
         throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
+
     }
 
     private synchronized Status getStatus(String subAgentName, long pkgOffset) {
@@ -104,4 +106,12 @@ public class StagingPrecondition implements Precondition, Runnable {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
     }
 
+    private static void delayStatusCheck() {
+        try {
+            Thread.sleep(STATUS_CHECK_DELAY_MS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fb4fbcd..93f528b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -384,6 +384,8 @@ public class DistributionSubscriber implements DistributionAgent {
             // Precondition timed out. We only log this on info level as it is no error
             LOG.info(e.getMessage());
             delay(RETRY_DELAY);
+        } catch (IllegalStateException e) {
+            throw e;
         } catch (Exception e) {
             // Catch all to prevent processing from stopping
             LOG.error("Error processing queue item", e);
@@ -418,7 +420,7 @@ public class DistributionSubscriber implements DistributionAgent {
         return STOPPED_ITEM;
     }
 
-    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
+    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException {
         long offset = queueItem.get(RECORD_OFFSET, Long.class);
         PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
         boolean skip = shouldSkip(offset);
@@ -433,7 +435,7 @@ public class DistributionSubscriber implements DistributionAgent {
         distributionMetricsService.getItemsBufferSize().decrement();
     }
 
-    private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
+    private boolean shouldSkip(long offset) throws TimeoutException {
         return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..6a9f893 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -35,7 +35,6 @@ import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
@@ -117,7 +116,7 @@ public class StagingPreconditionTest {
         th.start();
         precondition.deactivate();
         Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
-        assertThat(ex, instanceOf(InterruptedException.class));
+        assertThat(ex, instanceOf(IllegalStateException.class));
     }
     
     @Test(expected = TimeoutException.class)