You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:48 UTC
[sling-org-apache-sling-distribution-journal] 03/07: SLING-9340 -
Precondition to raise ISE instead of IE
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit b92195d39a0019dfd65e7abbc542b518b90e871e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:54:45 2020 +0200
SLING-9340 - Precondition to raise ISE instead of IE
---
.../journal/impl/precondition/Precondition.java | 4 ++--
.../impl/precondition/StagingPrecondition.java | 26 +++++++++++++++-------
.../impl/subscriber/DistributionSubscriber.java | 6 +++--
.../impl/precondition/StagingPreconditionTest.java | 3 +--
4 files changed, 25 insertions(+), 14 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 3730475..69da7b4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -30,9 +30,9 @@ public interface Precondition {
* @param pkgOffset the offset of the package
* @param timeoutSeconds max seconds to wait until returning
* @throws TimeoutException if the timeout expired without being able to determine status
- * @throws InterruptedException if the thread was interrupted and should shut down
+ * @throws IllegalStateException if the precondition can't be evaluated
* @return true if the package can be processed; otherwise it returns false.
*/
- boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
+ boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..c0e3d48 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -48,6 +48,8 @@ public class StagingPrecondition implements Precondition, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class);
+ private static final long STATUS_CHECK_DELAY_MS = 100;
+
@Reference
private MessagingProvider messagingProvider;
@@ -71,27 +73,27 @@ public class StagingPrecondition implements Precondition, Runnable {
}
@Override
- public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
+ public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException {
if (timeoutSeconds < 1) {
throw new IllegalArgumentException();
}
// try to get the status for timeoutSeconds and then throw
- for(int i=0; i < timeoutSeconds * 10; i++) {
+ for(int i = 0; running && i < timeoutSeconds * 10 ; i++) {
Status status = getStatus(subAgentName, pkgOffset);
-
if (status != null) {
return status == Status.IMPORTED;
} else {
- Thread.sleep(100);
- }
-
- if (!running) {
- throw new InterruptedException("Staging precondition is shutting down");
+ delayStatusCheck();
}
}
+ if (!running) {
+ throw new IllegalStateException("Staging precondition is shutting down");
+ }
+
throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
+
}
private synchronized Status getStatus(String subAgentName, long pkgOffset) {
@@ -104,4 +106,12 @@ public class StagingPrecondition implements Precondition, Runnable {
watcher = new PackageStatusWatcher(messagingProvider, topics);
}
+ private static void delayStatusCheck() {
+ try {
+ Thread.sleep(STATUS_CHECK_DELAY_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fb4fbcd..93f528b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -384,6 +384,8 @@ public class DistributionSubscriber implements DistributionAgent {
// Precondition timed out. We only log this on info level as it is no error
LOG.info(e.getMessage());
delay(RETRY_DELAY);
+ } catch (IllegalStateException e) {
+ throw e;
} catch (Exception e) {
// Catch all to prevent processing from stopping
LOG.error("Error processing queue item", e);
@@ -418,7 +420,7 @@ public class DistributionSubscriber implements DistributionAgent {
return STOPPED_ITEM;
}
- private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
+ private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException {
long offset = queueItem.get(RECORD_OFFSET, Long.class);
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
boolean skip = shouldSkip(offset);
@@ -433,7 +435,7 @@ public class DistributionSubscriber implements DistributionAgent {
distributionMetricsService.getItemsBufferSize().decrement();
}
- private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
+ private boolean shouldSkip(long offset) throws TimeoutException {
return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..6a9f893 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -35,7 +35,6 @@ import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
@@ -117,7 +116,7 @@ public class StagingPreconditionTest {
th.start();
precondition.deactivate();
Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
- assertThat(ex, instanceOf(InterruptedException.class));
+ assertThat(ex, instanceOf(IllegalStateException.class));
}
@Test(expected = TimeoutException.class)