You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:46 UTC

[sling-org-apache-sling-distribution-journal] 01/07: SLING-9340 - keep blocking code in the DistributionSubscriber class

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit d53f87e92859ea7ee8ac7787a886ea46ab20524e
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 22:19:59 2020 +0200

    SLING-9340 - keep blocking code in the DistributionSubscriber class
---
 .../journal/impl/subscriber/BookKeeper.java        | 30 +++++++++++-----------
 .../impl/subscriber/DistributionSubscriber.java    | 18 ++++++++++++-
 2 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 53a7245..1c12690 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -246,30 +246,22 @@ public class BookKeeper implements Closeable {
     }
 
     /**
-     * Send status stored in a previous run if exists
-     * @throws InterruptedException
+     * @return {@code true} if the status has been sent ;
+     *         {@code false} otherwise.
      */
-    public void sendStoredStatus() throws InterruptedException {
-        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
-            PackageStatus status = new PackageStatus(statusStore.load());
-            boolean sent = status.sent;
-            int retry = 0;
-            while (!sent) {
-                sent = sendStoredStatusOnce(status, retry++);
-            }
-        } catch (IOException e) {
-            log.warn("Error in timer close", e);
-        }
+    public boolean sendStoredStatus(int retry) {
+        PackageStatus status = new PackageStatus(statusStore.load());
+        return status.sent || sendStoredStatus(status, retry);
     }
 
-    private boolean sendStoredStatusOnce(PackageStatus status, int retry) throws InterruptedException {
+    private boolean sendStoredStatus(PackageStatus status, int retry) {
         try {
             sendStatusMessage(status);
             markStatusSent();
             return true;
         } catch (Exception e) {
             log.warn("Cannot send status (retry {})", retry, e);
-            Thread.sleep(RETRY_SEND_DELAY);
+            retryDelay();
             return false;
         }
     }
@@ -340,6 +332,14 @@ public class BookKeeper implements Closeable {
         return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
     }
 
+    static void retryDelay() {
+        try {
+            Thread.sleep(RETRY_SEND_DELAY);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     class PackageStatus {
         final Status status;
         final Long offset;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..8228424 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -29,6 +29,7 @@ import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.
 import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -365,7 +366,7 @@ public class DistributionSubscriber implements DistributionAgent {
     private void fetchAndProcessQueueItem() throws InterruptedException {
         try {
             
-            bookKeeper.sendStoredStatus();
+            boolean sent = blockingSendStoredStatus();
             DistributionQueueItem item = blockingPeekQueueItem();
 
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -389,6 +390,21 @@ public class DistributionSubscriber implements DistributionAgent {
         }
     }
 
+    /**
+     * Send status stored in a previous run if exists
+     *
+     * @return {@code true} if the status has been sent ;
+     *         {@code false} otherwise.
+     */
+    private boolean blockingSendStoredStatus() {
+        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+            for (int retry = 0 ; running && ! bookKeeper.sendStoredStatus(retry) ; retry++);
+        } catch (IOException e) {
+            LOG.warn("Error in timer close", e);
+        }
+        return running;
+    }
+
     private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
         while (true) {
             DistributionQueueItem queueItem = queueItemsBuffer.peek();