You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 21:38:51 UTC
[sling-org-apache-sling-distribution-journal] 06/07: SLING-9340 -
Don't interrupt seeder thread
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 33d609d0a321e5e7e4f9a6758413437d4d4291ea
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 23:24:23 2020 +0200
SLING-9340 - Don't interrupt seeder thread
---
.../journal/impl/queue/impl/PubQueueCache.java | 33 ++++++++++------------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 189513c..84e28c3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,8 +124,6 @@ public class PubQueueCache {
*/
private volatile boolean closed;
- private final Thread seeder;
-
public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, long seedingDelayMs) {
this.messagingProvider = messagingProvider;
this.eventAdmin = eventAdmin;
@@ -138,7 +136,7 @@ public class PubQueueCache {
Reset.latest,
create(PackageMessage.class, this::handlePackage));
- seeder = RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
+ RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
}
@Nonnull
@@ -153,7 +151,6 @@ public class PubQueueCache {
public void close() {
closed = true;
- seeder.interrupt();
IOUtils.closeQuietly(tailPoller);
jmxRegs.stream().forEach(IOUtils::closeQuietly);
}
@@ -162,27 +159,28 @@ public class PubQueueCache {
LOG.info("Start message seeder");
try {
MessageSender<PackageMessage> sender = messagingProvider.createSender();
- sendSeedingMessages(sender);
+ do {
+ sendSeedingMessage(sender);
+ } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
} finally {
LOG.info("Stop message seeder");
}
}
- private void sendSeedingMessages(MessageSender<PackageMessage> sender) {
- while (! Thread.interrupted()) {
- PackageMessage pkgMsg = createTestMessage();
- LOG.info("Send seeding message");
- try {
- sender.send(topic, pkgMsg);
- sleep(seedingDelayMs);
- } catch (MessagingException e) {
- LOG.warn(e.getMessage(), e);
- sleep(seedingDelayMs * 10);
- }
+ private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
+ PackageMessage pkgMsg = createTestMessage();
+ LOG.info("Send seeding message");
+ try {
+ sender.send(topic, pkgMsg);
+ } catch (MessagingException e) {
+ LOG.warn(e.getMessage(), e);
+ delay(seedingDelayMs * 10);
}
}
- private void sleep(long sleepMs) {
+ private static void delay(long sleepMs) {
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
@@ -338,6 +336,5 @@ public class PubQueueCache {
LOG.info("Cache has been seeded");
}
seeded.countDown();
- seeder.interrupt();
}
}