You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:46 UTC
[sling-org-apache-sling-distribution-journal] 01/07: SLING-9340 -
keep blocking code in the DistributionSubscriber class
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit d53f87e92859ea7ee8ac7787a886ea46ab20524e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:19:59 2020 +0200
SLING-9340 - keep blocking code in the DistributionSubscriber class
---
.../journal/impl/subscriber/BookKeeper.java | 30 +++++++++++-----------
.../impl/subscriber/DistributionSubscriber.java | 18 ++++++++++++-
2 files changed, 32 insertions(+), 16 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 53a7245..1c12690 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -246,30 +246,22 @@ public class BookKeeper implements Closeable {
}
/**
- * Send status stored in a previous run if exists
- * @throws InterruptedException
+ * @return {@code true} if the status has been sent ;
+ * {@code false} otherwise.
*/
- public void sendStoredStatus() throws InterruptedException {
- try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
- PackageStatus status = new PackageStatus(statusStore.load());
- boolean sent = status.sent;
- int retry = 0;
- while (!sent) {
- sent = sendStoredStatusOnce(status, retry++);
- }
- } catch (IOException e) {
- log.warn("Error in timer close", e);
- }
+ public boolean sendStoredStatus(int retry) {
+ PackageStatus status = new PackageStatus(statusStore.load());
+ return status.sent || sendStoredStatus(status, retry);
}
- private boolean sendStoredStatusOnce(PackageStatus status, int retry) throws InterruptedException {
+ private boolean sendStoredStatus(PackageStatus status, int retry) {
try {
sendStatusMessage(status);
markStatusSent();
return true;
} catch (Exception e) {
log.warn("Cannot send status (retry {})", retry, e);
- Thread.sleep(RETRY_SEND_DELAY);
+ retryDelay();
return false;
}
}
@@ -340,6 +332,14 @@ public class BookKeeper implements Closeable {
return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
}
+ static void retryDelay() {
+ try {
+ Thread.sleep(RETRY_SEND_DELAY);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
class PackageStatus {
final Status status;
final Long offset;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..8228424 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -29,6 +29,7 @@ import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.
import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -365,7 +366,7 @@ public class DistributionSubscriber implements DistributionAgent {
private void fetchAndProcessQueueItem() throws InterruptedException {
try {
- bookKeeper.sendStoredStatus();
+ boolean sent = blockingSendStoredStatus();
DistributionQueueItem item = blockingPeekQueueItem();
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -389,6 +390,21 @@ public class DistributionSubscriber implements DistributionAgent {
}
}
+ /**
+ * Send status stored in a previous run if exists
+ *
+ * @return {@code true} if the status has been sent ;
+ * {@code false} otherwise.
+ */
+ private boolean blockingSendStoredStatus() {
+ try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+ for (int retry = 0 ; running && ! bookKeeper.sendStoredStatus(retry) ; retry++);
+ } catch (IOException e) {
+ LOG.warn("Error in timer close", e);
+ }
+ return running;
+ }
+
private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
while (true) {
DistributionQueueItem queueItem = queueItemsBuffer.peek();