You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/04/28 12:22:44 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9401 - Make subscribeIdle check configurable
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 17fb89a SLING-9401 - Make subscribeIdle check configurable
17fb89a is described below
commit 17fb89a756015f23c15c9612faa5aca4b7341554
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Apr 28 14:22:22 2020 +0200
SLING-9401 - Make subscribeIdle check configurable
---
.../impl/subscriber/DistributionSubscriber.java | 20 +++++++++++++-------
.../impl/subscriber/SubscriberConfiguration.java | 2 ++
.../journal/impl/subscriber/SubscriberTest.java | 5 +++--
3 files changed, 18 insertions(+), 9 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 663dce7..4aae01d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -36,6 +36,7 @@ import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -142,7 +143,7 @@ public class DistributionSubscriber implements DistributionAgent {
@Reference
private Packaging packaging;
- SubscriberIdle subscriberIdle;
+ Optional<SubscriberIdle> subscriberIdle;
private ServiceRegistration<DistributionAgent> componentReg;
@@ -180,9 +181,13 @@ public class DistributionSubscriber implements DistributionAgent {
requireNonNull(eventAdmin);
requireNonNull(precondition);
- // Unofficial config (currently just for test)
- Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
- subscriberIdle = new SubscriberIdle(context, idleMillies);
+ if (config.subscriberIdleCheck()) {
+ // Unofficial config (currently just for test)
+ Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+ subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies));
+ } else {
+ subscriberIdle = Optional.empty();
+ }
queueNames = getNotEmpty(config.agentNames());
int maxRetries = config.maxRetries();
@@ -252,8 +257,9 @@ public class DistributionSubscriber implements DistributionAgent {
*/
componentReg.unregister();
- IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper,
+ IOUtils.closeQuietly(announcer, bookKeeper,
packagePoller, commandPoller);
+ subscriberIdle.ifPresent(IOUtils::closeQuietly);
running = false;
String msg = String.format(
"Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
@@ -378,7 +384,7 @@ public class DistributionSubscriber implements DistributionAgent {
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
processQueueItem(item);
} finally {
- subscriberIdle.idle();
+ subscriberIdle.ifPresent(SubscriberIdle::idle);
}
} catch (TimeoutException e) {
@@ -425,7 +431,7 @@ public class DistributionSubscriber implements DistributionAgent {
long offset = queueItem.get(RECORD_OFFSET, Long.class);
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
boolean skip = shouldSkip(offset);
- subscriberIdle.busy();
+ subscriberIdle.ifPresent(SubscriberIdle::busy);
if (skip) {
bookKeeper.removePackage(pkgMsg, offset);
} else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 937c30a..91ce8f8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -52,4 +52,6 @@ public @interface SubscriberConfiguration {
@AttributeDefinition(name = "packageHandling", description = "Defines if content packages in /etc/packages should be processed (Extract, Install, Off).")
PackageHandling packageHandling() default PackageHandling.Off;
+ @AttributeDefinition(name = "subscriberIdleCheck", description = "Defines if we register a subscriber idle health check.")
+ boolean subscriberIdleCheck() default false;
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1d47520..6b093eb 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -351,7 +351,7 @@ public class SubscriberTest {
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
- await("Should report ready").until(subscriber.subscriberIdle::isReady);
+ await("Should report ready").until(() -> subscriber.subscriberIdle.get().isReady());
sem.release();
}
@@ -363,7 +363,8 @@ public class SubscriberTest {
Map<String, Object> basicProps = ImmutableMap.of(
"name", SUB1_AGENT_NAME,
"agentNames", PUB1_AGENT_NAME,
- "idleMillies", 1000);
+ "idleMillies", 1000,
+ "subscriberIdleCheck", true);
Map<String, Object> props = new HashMap<>();
props.putAll(basicProps);
props.putAll(overrides);