You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:51 UTC

[sling-org-apache-sling-distribution-journal] 06/07: SLING-9340 - Don't interrupt seeder thread

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 33d609d0a321e5e7e4f9a6758413437d4d4291ea
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:24:23 2020 +0200

    SLING-9340 - Don't interrupt seeder thread
---
 .../journal/impl/queue/impl/PubQueueCache.java     | 33 ++++++++++------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 189513c..84e28c3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,8 +124,6 @@ public class PubQueueCache {
      */
     private volatile boolean closed;
 
-    private final Thread seeder;
-
     public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, long seedingDelayMs) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
@@ -138,7 +136,7 @@ public class PubQueueCache {
                 Reset.latest,
                 create(PackageMessage.class, this::handlePackage));
 
-        seeder = RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
+        RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
     }
 
     @Nonnull
@@ -153,7 +151,6 @@ public class PubQueueCache {
 
     public void close() {
         closed = true;
-        seeder.interrupt();
         IOUtils.closeQuietly(tailPoller);
         jmxRegs.stream().forEach(IOUtils::closeQuietly);
     }
@@ -162,27 +159,28 @@ public class PubQueueCache {
         LOG.info("Start message seeder");
         try {
             MessageSender<PackageMessage> sender = messagingProvider.createSender();
-            sendSeedingMessages(sender);
+            do {
+                sendSeedingMessage(sender);
+            } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         } finally {
             LOG.info("Stop message seeder");
         }
     }
 
-    private void sendSeedingMessages(MessageSender<PackageMessage> sender) {
-        while (! Thread.interrupted()) {
-            PackageMessage pkgMsg = createTestMessage();
-            LOG.info("Send seeding message");
-            try {
-                sender.send(topic, pkgMsg);
-                sleep(seedingDelayMs);
-            } catch (MessagingException e) {
-                LOG.warn(e.getMessage(), e);
-                sleep(seedingDelayMs * 10);
-            }
+    private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
+        PackageMessage pkgMsg = createTestMessage();
+        LOG.info("Send seeding message");
+        try {
+            sender.send(topic, pkgMsg);
+        } catch (MessagingException e) {
+            LOG.warn(e.getMessage(), e);
+            delay(seedingDelayMs * 10);
         }
     }
 
-    private void sleep(long sleepMs) {
+    private static void delay(long sleepMs) {
         try {
             Thread.sleep(sleepMs);
         } catch (InterruptedException e) {
@@ -338,6 +336,5 @@ public class PubQueueCache {
             LOG.info("Cache has been seeded");
         }
         seeded.countDown();
-        seeder.interrupt();
     }
 }