You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/04/28 12:22:44 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9401 - Make subscribeIdle check configurable

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 17fb89a  SLING-9401 - Make subscribeIdle check configurable
17fb89a is described below

commit 17fb89a756015f23c15c9612faa5aca4b7341554
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Apr 28 14:22:22 2020 +0200

    SLING-9401 - Make subscribeIdle check configurable
---
 .../impl/subscriber/DistributionSubscriber.java      | 20 +++++++++++++-------
 .../impl/subscriber/SubscriberConfiguration.java     |  2 ++
 .../journal/impl/subscriber/SubscriberTest.java      |  5 +++--
 3 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 663dce7..4aae01d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -36,6 +36,7 @@ import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -142,7 +143,7 @@ public class DistributionSubscriber implements DistributionAgent {
     @Reference
     private Packaging packaging;
     
-    SubscriberIdle subscriberIdle;
+    Optional<SubscriberIdle> subscriberIdle;
     
     private ServiceRegistration<DistributionAgent> componentReg;
 
@@ -180,9 +181,13 @@ public class DistributionSubscriber implements DistributionAgent {
         requireNonNull(eventAdmin);
         requireNonNull(precondition);
 
-        // Unofficial config (currently just for test)
-        Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
-        subscriberIdle = new SubscriberIdle(context, idleMillies);
+        if (config.subscriberIdleCheck()) {
+            // Unofficial config (currently just for test)
+            Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+            subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies));
+        } else {
+            subscriberIdle = Optional.empty();
+        }
         
         queueNames = getNotEmpty(config.agentNames());
         int maxRetries = config.maxRetries();
@@ -252,8 +257,9 @@ public class DistributionSubscriber implements DistributionAgent {
          */
 
         componentReg.unregister();
-        IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper, 
+        IOUtils.closeQuietly(announcer, bookKeeper, 
                 packagePoller, commandPoller);
+        subscriberIdle.ifPresent(IOUtils::closeQuietly);
         running = false;
         String msg = String.format(
                 "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
@@ -378,7 +384,7 @@ public class DistributionSubscriber implements DistributionAgent {
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
                 processQueueItem(item);
             } finally {
-                subscriberIdle.idle();
+                subscriberIdle.ifPresent(SubscriberIdle::idle);
             }
 
         } catch (TimeoutException e) {
@@ -425,7 +431,7 @@ public class DistributionSubscriber implements DistributionAgent {
         long offset = queueItem.get(RECORD_OFFSET, Long.class);
         PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
         boolean skip = shouldSkip(offset);
-        subscriberIdle.busy();
+        subscriberIdle.ifPresent(SubscriberIdle::busy);
         if (skip) {
             bookKeeper.removePackage(pkgMsg, offset);
         } else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 937c30a..91ce8f8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -52,4 +52,6 @@ public @interface SubscriberConfiguration {
     @AttributeDefinition(name = "packageHandling", description = "Defines if content packages in /etc/packages should be processed (Extract, Install, Off).")
     PackageHandling packageHandling() default PackageHandling.Off;
     
+    @AttributeDefinition(name = "subscriberIdleCheck", description = "Defines if we register a subscriber idle health check.")
+    boolean subscriberIdleCheck() default false;
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1d47520..6b093eb 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -351,7 +351,7 @@ public class SubscriberTest {
 
         packageHandler.handle(info, message);
         waitSubscriber(RUNNING);
-        await("Should report ready").until(subscriber.subscriberIdle::isReady);
+        await("Should report ready").until(() -> subscriber.subscriberIdle.get().isReady());
         sem.release();
     }
 
@@ -363,7 +363,8 @@ public class SubscriberTest {
         Map<String, Object> basicProps = ImmutableMap.of(
             "name", SUB1_AGENT_NAME,
             "agentNames", PUB1_AGENT_NAME,
-            "idleMillies", 1000);
+            "idleMillies", 1000,
+            "subscriberIdleCheck", true);
         Map<String, Object> props = new HashMap<>();
         props.putAll(basicProps);
         props.putAll(overrides);