You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:45 UTC
[sling-org-apache-sling-distribution-journal] branch master updated
(03d8900 -> fe471d6)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.
from 03d8900 Revert "SLING-9259 - Extract service subscriber code into separate package (#25)"
new d53f87e SLING-9340 - keep blocking code in the DistributionSubscriber class
new a8c8aef SLING-9340 - stop processQueue using the running flag
new b92195d SLING-9340 - Precondition to raise ISE instead of IE
new 3078830 SLING-9340 - Don't interrupt the queue processor thread
new c1d3925 SLING-9340 - Don't raise IE upon enqueue
new 33d609d SLING-9340 - Don't interrupt seeder thread
new fe471d6 SLING-9340 - Add a comments highlighting why we don't use interrupt
The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../journal/impl/precondition/Precondition.java | 4 +-
.../impl/precondition/StagingPrecondition.java | 34 ++++++---
.../journal/impl/queue/impl/PubQueueCache.java | 41 ++++++-----
.../journal/impl/subscriber/BookKeeper.java | 30 ++++----
.../impl/subscriber/DistributionSubscriber.java | 81 +++++++++++++++-------
.../impl/precondition/StagingPreconditionTest.java | 3 +-
6 files changed, 122 insertions(+), 71 deletions(-)
[sling-org-apache-sling-distribution-journal] 05/07: SLING-9340 -
Don't raise IE upon enqueue
Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit c1d39257cf01fa96e2d1e9e66011472c23326fdc
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:07:38 2020 +0200
SLING-9340 - Don't raise IE upon enqueue
---
.../distribution/journal/impl/subscriber/DistributionSubscriber.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 3e601ac..8ca48ff 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -341,11 +341,10 @@ public class DistributionSubscriber implements DistributionAgent {
return;
}
}
- throw new InterruptedException();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new RuntimeException();
}
+ throw new RuntimeException();
}
private void processQueue() {
[sling-org-apache-sling-distribution-journal] 01/07: SLING-9340 -
keep blocking code in the DistributionSubscriber class
Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit d53f87e92859ea7ee8ac7787a886ea46ab20524e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:19:59 2020 +0200
SLING-9340 - keep blocking code in the DistributionSubscriber class
---
.../journal/impl/subscriber/BookKeeper.java | 30 +++++++++++-----------
.../impl/subscriber/DistributionSubscriber.java | 18 ++++++++++++-
2 files changed, 32 insertions(+), 16 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 53a7245..1c12690 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -246,30 +246,22 @@ public class BookKeeper implements Closeable {
}
/**
- * Send status stored in a previous run if exists
- * @throws InterruptedException
+ * @return {@code true} if the status has been sent ;
+ * {@code false} otherwise.
*/
- public void sendStoredStatus() throws InterruptedException {
- try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
- PackageStatus status = new PackageStatus(statusStore.load());
- boolean sent = status.sent;
- int retry = 0;
- while (!sent) {
- sent = sendStoredStatusOnce(status, retry++);
- }
- } catch (IOException e) {
- log.warn("Error in timer close", e);
- }
+ public boolean sendStoredStatus(int retry) {
+ PackageStatus status = new PackageStatus(statusStore.load());
+ return status.sent || sendStoredStatus(status, retry);
}
- private boolean sendStoredStatusOnce(PackageStatus status, int retry) throws InterruptedException {
+ private boolean sendStoredStatus(PackageStatus status, int retry) {
try {
sendStatusMessage(status);
markStatusSent();
return true;
} catch (Exception e) {
log.warn("Cannot send status (retry {})", retry, e);
- Thread.sleep(RETRY_SEND_DELAY);
+ retryDelay();
return false;
}
}
@@ -340,6 +332,14 @@ public class BookKeeper implements Closeable {
return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
}
+ static void retryDelay() {
+ try {
+ Thread.sleep(RETRY_SEND_DELAY);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
class PackageStatus {
final Status status;
final Long offset;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..8228424 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -29,6 +29,7 @@ import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.
import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -365,7 +366,7 @@ public class DistributionSubscriber implements DistributionAgent {
private void fetchAndProcessQueueItem() throws InterruptedException {
try {
- bookKeeper.sendStoredStatus();
+ boolean sent = blockingSendStoredStatus();
DistributionQueueItem item = blockingPeekQueueItem();
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -389,6 +390,21 @@ public class DistributionSubscriber implements DistributionAgent {
}
}
+ /**
+ * Send status stored in a previous run if exists
+ *
+ * @return {@code true} if the status has been sent ;
+ * {@code false} otherwise.
+ */
+ private boolean blockingSendStoredStatus() {
+ try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+ for (int retry = 0 ; running && ! bookKeeper.sendStoredStatus(retry) ; retry++);
+ } catch (IOException e) {
+ LOG.warn("Error in timer close", e);
+ }
+ return running;
+ }
+
private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
while (true) {
DistributionQueueItem queueItem = queueItemsBuffer.peek();
[sling-org-apache-sling-distribution-journal] 07/07: SLING-9340 -
Add a comments highlighting why we don't use interrupt
Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit fe471d6d2158d70b74197499cf6a66132f3ab5e7
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:34:39 2020 +0200
SLING-9340 - Add a comments highlighting why we don't use interrupt
---
.../journal/impl/precondition/StagingPrecondition.java | 8 ++++++++
.../sling/distribution/journal/impl/queue/impl/PubQueueCache.java | 8 ++++++++
.../journal/impl/subscriber/DistributionSubscriber.java | 8 ++++++++
3 files changed, 24 insertions(+)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index c0e3d48..d523cdf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -68,6 +68,14 @@ public class StagingPrecondition implements Precondition, Runnable {
@Deactivate
public synchronized void deactivate() {
+
+ /*
+ * Note that we don't interrupt blocking calls using Thread.interrupt()
+ * because interrupts can stop the Apache Oak repository.
+ *
+ * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+ */
+
IOUtils.closeQuietly(watcher);
running = false;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 84e28c3..1b9e14d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -150,6 +150,14 @@ public class PubQueueCache {
}
public void close() {
+
+ /*
+ * Note that we don't close resources using Thread.interrupt()
+ * because interrupts can stop the Apache Oak repository.
+ *
+ * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+ */
+
closed = true;
IOUtils.closeQuietly(tailPoller);
jmxRegs.stream().forEach(IOUtils::closeQuietly);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8ca48ff..663dce7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -243,6 +243,14 @@ public class DistributionSubscriber implements DistributionAgent {
@Deactivate
public void deactivate() {
+
+ /*
+ * Note that we don't interrupt blocking calls using Thread.interrupt()
+ * because interrupts can stop the Apache Oak repository.
+ *
+ * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+ */
+
componentReg.unregister();
IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper,
packagePoller, commandPoller);
[sling-org-apache-sling-distribution-journal] 02/07: SLING-9340 -
stop processQueue using the running flag
Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit a8c8aeff22cd9af1d5ba4e46109484f63a366f8b
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:36:51 2020 +0200
SLING-9340 - stop processQueue using the running flag
---
.../impl/subscriber/DistributionSubscriber.java | 42 +++++++++++++---------
1 file changed, 26 insertions(+), 16 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8228424..fb4fbcd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
@@ -109,6 +110,8 @@ public class DistributionSubscriber implements DistributionAgent {
private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
+ private static final DistributionQueueItem STOPPED_ITEM = new DistributionQueueItem("stop-item", emptyMap());
+
@Reference(name = "packageBuilder")
private DistributionPackageBuilder packageBuilder;
@@ -353,21 +356,23 @@ public class DistributionSubscriber implements DistributionAgent {
private void processQueue() {
LOG.info("Started Queue processor");
- while (!Thread.interrupted()) {
- try {
- fetchAndProcessQueueItem();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ while (running) {
+ fetchAndProcessQueueItem();
}
LOG.info("Stopped Queue processor");
}
- private void fetchAndProcessQueueItem() throws InterruptedException {
+ private void fetchAndProcessQueueItem() {
try {
- boolean sent = blockingSendStoredStatus();
+ if (! blockingSendStoredStatus()) {
+ return;
+ }
+
DistributionQueueItem item = blockingPeekQueueItem();
+ if (STOPPED_ITEM == item) {
+ return;
+ }
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
processQueueItem(item);
@@ -376,17 +381,13 @@ public class DistributionSubscriber implements DistributionAgent {
}
} catch (TimeoutException e) {
- /**
- * Precondition timed out. We only log this on info level as it is no error
- */
+ // Precondition timed out. We only log this on info level as it is no error
LOG.info(e.getMessage());
- Thread.sleep(RETRY_DELAY);
- } catch (InterruptedException e) {
- throw e;
+ delay(RETRY_DELAY);
} catch (Exception e) {
// Catch all to prevent processing from stopping
LOG.error("Error processing queue item", e);
- Thread.sleep(RETRY_DELAY);
+ delay(RETRY_DELAY);
}
}
@@ -406,7 +407,7 @@ public class DistributionSubscriber implements DistributionAgent {
}
private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
- while (true) {
+ while (running) {
DistributionQueueItem queueItem = queueItemsBuffer.peek();
if (queueItem != null) {
return queueItem;
@@ -414,6 +415,7 @@ public class DistributionSubscriber implements DistributionAgent {
Thread.sleep(QUEUE_FETCH_DELAY);
}
}
+ return STOPPED_ITEM;
}
private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
@@ -435,4 +437,12 @@ public class DistributionSubscriber implements DistributionAgent {
return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
}
+ private static void delay(long delayInMs) {
+ try {
+ Thread.sleep(delayInMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
}
[sling-org-apache-sling-distribution-journal] 06/07: SLING-9340 -
Don't interrupt seeder thread
Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 33d609d0a321e5e7e4f9a6758413437d4d4291ea
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:24:23 2020 +0200
SLING-9340 - Don't interrupt seeder thread
---
.../journal/impl/queue/impl/PubQueueCache.java | 33 ++++++++++------------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 189513c..84e28c3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,8 +124,6 @@ public class PubQueueCache {
*/
private volatile boolean closed;
- private final Thread seeder;
-
public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, long seedingDelayMs) {
this.messagingProvider = messagingProvider;
this.eventAdmin = eventAdmin;
@@ -138,7 +136,7 @@ public class PubQueueCache {
Reset.latest,
create(PackageMessage.class, this::handlePackage));
- seeder = RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
+ RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
}
@Nonnull
@@ -153,7 +151,6 @@ public class PubQueueCache {
public void close() {
closed = true;
- seeder.interrupt();
IOUtils.closeQuietly(tailPoller);
jmxRegs.stream().forEach(IOUtils::closeQuietly);
}
@@ -162,27 +159,28 @@ public class PubQueueCache {
LOG.info("Start message seeder");
try {
MessageSender<PackageMessage> sender = messagingProvider.createSender();
- sendSeedingMessages(sender);
+ do {
+ sendSeedingMessage(sender);
+ } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
} finally {
LOG.info("Stop message seeder");
}
}
- private void sendSeedingMessages(MessageSender<PackageMessage> sender) {
- while (! Thread.interrupted()) {
- PackageMessage pkgMsg = createTestMessage();
- LOG.info("Send seeding message");
- try {
- sender.send(topic, pkgMsg);
- sleep(seedingDelayMs);
- } catch (MessagingException e) {
- LOG.warn(e.getMessage(), e);
- sleep(seedingDelayMs * 10);
- }
+ private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
+ PackageMessage pkgMsg = createTestMessage();
+ LOG.info("Send seeding message");
+ try {
+ sender.send(topic, pkgMsg);
+ } catch (MessagingException e) {
+ LOG.warn(e.getMessage(), e);
+ delay(seedingDelayMs * 10);
}
}
- private void sleep(long sleepMs) {
+ private static void delay(long sleepMs) {
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
@@ -338,6 +336,5 @@ public class PubQueueCache {
LOG.info("Cache has been seeded");
}
seeded.countDown();
- seeder.interrupt();
}
}
[sling-org-apache-sling-distribution-journal] 04/07: SLING-9340 -
Don't interrupt the queue processor thread
Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 307883048dad3b6ba798917e622271b8b31e19f4
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:57:00 2020 +0200
SLING-9340 - Don't interrupt the queue processor thread
---
.../journal/impl/subscriber/DistributionSubscriber.java | 8 +-------
1 file changed, 1 insertion(+), 7 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 93f528b..3e601ac 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -166,8 +166,6 @@ public class DistributionSubscriber implements DistributionAgent {
private volatile boolean running = true;
- private volatile Thread queueProcessor;
-
@Activate
public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
String subSlingId = requireNonNull(slingSettings.getSlingId());
@@ -203,7 +201,7 @@ public class DistributionSubscriber implements DistributionAgent {
commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, editable);
- queueProcessor = startBackgroundThread(this::processQueue,
+ startBackgroundThread(this::processQueue,
format("Queue Processor for Subscriber agent %s", subAgentName));
int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
@@ -249,10 +247,6 @@ public class DistributionSubscriber implements DistributionAgent {
IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper,
packagePoller, commandPoller);
running = false;
- Thread interrupter = this.queueProcessor;
- if (interrupter != null) {
- interrupter.interrupt();
- }
String msg = String.format(
"Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
subAgentName, queueNames, pkgType);
[sling-org-apache-sling-distribution-journal] 03/07: SLING-9340 -
Precondition to raise ISE instead of IE
Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit b92195d39a0019dfd65e7abbc542b518b90e871e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:54:45 2020 +0200
SLING-9340 - Precondition to raise ISE instead of IE
---
.../journal/impl/precondition/Precondition.java | 4 ++--
.../impl/precondition/StagingPrecondition.java | 26 +++++++++++++++-------
.../impl/subscriber/DistributionSubscriber.java | 6 +++--
.../impl/precondition/StagingPreconditionTest.java | 3 +--
4 files changed, 25 insertions(+), 14 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 3730475..69da7b4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -30,9 +30,9 @@ public interface Precondition {
* @param pkgOffset the offset of the package
* @param timeoutSeconds max seconds to wait until returning
* @throws TimeoutException if the timeout expired without being able to determine status
- * @throws InterruptedException if the thread was interrupted and should shut down
+ * @throws IllegalStateException if the precondition can't be evaluated
* @return true if the package can be processed; otherwise it returns false.
*/
- boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
+ boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..c0e3d48 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -48,6 +48,8 @@ public class StagingPrecondition implements Precondition, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class);
+ private static final long STATUS_CHECK_DELAY_MS = 100;
+
@Reference
private MessagingProvider messagingProvider;
@@ -71,27 +73,27 @@ public class StagingPrecondition implements Precondition, Runnable {
}
@Override
- public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
+ public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException {
if (timeoutSeconds < 1) {
throw new IllegalArgumentException();
}
// try to get the status for timeoutSeconds and then throw
- for(int i=0; i < timeoutSeconds * 10; i++) {
+ for(int i = 0; running && i < timeoutSeconds * 10 ; i++) {
Status status = getStatus(subAgentName, pkgOffset);
-
if (status != null) {
return status == Status.IMPORTED;
} else {
- Thread.sleep(100);
- }
-
- if (!running) {
- throw new InterruptedException("Staging precondition is shutting down");
+ delayStatusCheck();
}
}
+ if (!running) {
+ throw new IllegalStateException("Staging precondition is shutting down");
+ }
+
throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
+
}
private synchronized Status getStatus(String subAgentName, long pkgOffset) {
@@ -104,4 +106,12 @@ public class StagingPrecondition implements Precondition, Runnable {
watcher = new PackageStatusWatcher(messagingProvider, topics);
}
+ private static void delayStatusCheck() {
+ try {
+ Thread.sleep(STATUS_CHECK_DELAY_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fb4fbcd..93f528b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -384,6 +384,8 @@ public class DistributionSubscriber implements DistributionAgent {
// Precondition timed out. We only log this on info level as it is no error
LOG.info(e.getMessage());
delay(RETRY_DELAY);
+ } catch (IllegalStateException e) {
+ throw e;
} catch (Exception e) {
// Catch all to prevent processing from stopping
LOG.error("Error processing queue item", e);
@@ -418,7 +420,7 @@ public class DistributionSubscriber implements DistributionAgent {
return STOPPED_ITEM;
}
- private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
+ private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException {
long offset = queueItem.get(RECORD_OFFSET, Long.class);
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
boolean skip = shouldSkip(offset);
@@ -433,7 +435,7 @@ public class DistributionSubscriber implements DistributionAgent {
distributionMetricsService.getItemsBufferSize().decrement();
}
- private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
+ private boolean shouldSkip(long offset) throws TimeoutException {
return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..6a9f893 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -35,7 +35,6 @@ import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
@@ -117,7 +116,7 @@ public class StagingPreconditionTest {
th.start();
precondition.deactivate();
Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
- assertThat(ex, instanceOf(InterruptedException.class));
+ assertThat(ex, instanceOf(IllegalStateException.class));
}
@Test(expected = TimeoutException.class)