You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/02/24 13:39:45 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9134 - Allow to configure initial and max journal checker
delay upon retries
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new f79594a SLING-9134 - Allow to configure initial and max journal checker delay upon retries
f79594a is described below
commit f79594aff0f1f157935f82ecc8850d599d6f8dfa
Author: tmaret <tm...@adobe.com>
AuthorDate: Mon Feb 24 14:39:24 2020 +0100
SLING-9134 - Allow to configure initial and max journal checker delay upon retries
---
.../journal/impl/shared/ExponentialBackOff.java | 14 +++++---
.../impl/shared/JournalAvailableChecker.java | 39 ++++++++++++++--------
.../impl/shared/ExponentialBackoffTest.java | 16 ++++-----
.../impl/shared/JournalAvailableCheckerTest.java | 12 ++++---
4 files changed, 50 insertions(+), 31 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
index 1338ef4..28f5aa8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -21,7 +21,6 @@ package org.apache.sling.distribution.journal.impl.shared;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.io.Closeable;
-import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -51,9 +50,16 @@ public class ExponentialBackOff implements Closeable {
private long currentMaxDelay;
private long lastCheck;
- public ExponentialBackOff(Duration startDelay, Duration maxDelay, boolean randomDelay, Runnable checkCallback) {
- this.startDelay = startDelay.toMillis();
- this.maxDelay = maxDelay.toMillis();
+ /**
+ * @param startDelay the start delay in milliseconds
+ * @param maxDelay the max delay in milliseconds
+ * @param randomDelay {@code true} to randomise the delay between 0 and the current max delay ;
+ * {@code false} to use the current max delay.
+ * @param checkCallback the code to invoke when the current delay has elapsed
+ */
+ public ExponentialBackOff(long startDelay, long maxDelay, boolean randomDelay, Runnable checkCallback) {
+ this.startDelay = startDelay;
+ this.maxDelay = maxDelay;
this.randomDelay = randomDelay;
this.checkCallback = checkCallback;
this.executor = Executors.newScheduledThreadPool(1);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 5dee552..feb03a5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -18,12 +18,8 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
-import static java.time.temporal.ChronoUnit.MILLIS;
-import static java.time.temporal.ChronoUnit.MINUTES;
import static java.util.Objects.requireNonNull;
-import java.time.Duration;
-
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessagingProvider;
@@ -36,6 +32,9 @@ import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,14 +43,16 @@ import org.slf4j.LoggerFactory;
service = EventHandler.class,
property = EventConstants.EVENT_TOPIC + "=" + ExceptionEventSender.ERROR_TOPIC
)
+@Designate(ocd = JournalAvailableChecker.JournalCheckerConfiguration.class)
public class JournalAvailableChecker implements EventHandler {
-
- private static final Duration INITIAL_RETRY_DELAY = Duration.of(500, MILLIS);
- private static final Duration MAX_RETRY_DELAY = Duration.of(5, MINUTES);
+
+ public static final long INITIAL_RETRY_DELAY = 500; // 500 ms
+
+ public static final long MAX_RETRY_DELAY = 5 * 60 * 1000; // 5 minutes
private static final Logger LOG = LoggerFactory.getLogger(JournalAvailableChecker.class);
- private final ExponentialBackOff backoffRetry;
+ private ExponentialBackOff backoffRetry;
@Reference
Topics topics;
@@ -65,19 +66,16 @@ public class JournalAvailableChecker implements EventHandler {
private JournalAvailableServiceMarker marker;
private GaugeService<Boolean> gauge;
-
- public JournalAvailableChecker() {
- this.backoffRetry = new ExponentialBackOff(INITIAL_RETRY_DELAY, MAX_RETRY_DELAY, true, this::run);
- }
@Activate
- public void activate(BundleContext context) {
+ public void activate(JournalCheckerConfiguration config, BundleContext context) {
requireNonNull(provider);
requireNonNull(topics);
+ this.backoffRetry = new ExponentialBackOff(config.initialRetryDelay(), config.maxRetryDelay(), true, this::run);
this.marker = new JournalAvailableServiceMarker(context);
this.gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
this.marker.register();
- LOG.info("Started Journal availability checker service. Journal is initially assumed available.");
+ LOG.info("Started Journal availability checker service with initialRetryDelay {}, maxRetryDelay {}. Journal is initially assumed available.", config.initialRetryDelay(), config.maxRetryDelay());
}
@Deactivate
@@ -133,4 +131,17 @@ public class JournalAvailableChecker implements EventHandler {
}
}
+ @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Journal Checker")
+ public @interface JournalCheckerConfiguration {
+
+ @AttributeDefinition(name = "Initial retry delay",
+ description = "The initial retry delay in milliseconds.")
+ long initialRetryDelay() default INITIAL_RETRY_DELAY;
+
+ @AttributeDefinition(name = "Max retry delay",
+ description = "The max retry delay in milliseconds.")
+ long maxRetryDelay() default MAX_RETRY_DELAY;
+
+ }
+
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
index 55e4c01..bd44057 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -18,11 +18,11 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
+import static java.time.Duration.of;
import static java.time.temporal.ChronoUnit.MILLIS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -36,9 +36,9 @@ import org.slf4j.LoggerFactory;
@RunWith(MockitoJUnitRunner.class)
public class ExponentialBackoffTest {
private static final int RETRIES = 5;
- private static final Duration INITIAL_DELAY = Duration.of(64, MILLIS);
- private static final Duration MAX_DELAY = Duration.of(256, MILLIS);
- private static final Duration LONG_DELAY = Duration.of(5, ChronoUnit.SECONDS);
+ private static final long INITIAL_DELAY = of(64, MILLIS).toMillis();
+ private static final long MAX_DELAY = of(256, MILLIS).toMillis();
+ private static final long LONG_DELAY = of(5, ChronoUnit.SECONDS).toMillis();
private Logger log = LoggerFactory.getLogger(this.getClass());
@@ -52,21 +52,21 @@ public class ExponentialBackoffTest {
backOff.startChecks();
// Run into double trigger protection
backOff.startChecks();
- boolean finished = this.countDown.await(MAX_DELAY.toMillis() * RETRIES, TimeUnit.MILLISECONDS);
+ boolean finished = this.countDown.await(MAX_DELAY * RETRIES, TimeUnit.MILLISECONDS);
assertThat("Should finish before the timeout", finished, equalTo(true));
log.info("Checking for long delay if next error happens quickly");
this.countDown = new CountDownLatch(1);
backOff.startChecks();
- boolean finished2 = this.countDown.await(INITIAL_DELAY.toMillis() * 2, TimeUnit.MILLISECONDS);
+ boolean finished2 = this.countDown.await(INITIAL_DELAY * 2, TimeUnit.MILLISECONDS);
assertThat("Should not finish quickly as we called startChecks immediately", finished2, equalTo(false));
this.countDown.await();
log.info("Checking for short delay if next error happens after enough time");
- Thread.sleep(MAX_DELAY.toMillis() * 3);
+ Thread.sleep(MAX_DELAY * 3);
this.countDown = new CountDownLatch(1);
backOff.startChecks();
- boolean finished3 = this.countDown.await(INITIAL_DELAY.toMillis() * 4, TimeUnit.MILLISECONDS);
+ boolean finished3 = this.countDown.await(INITIAL_DELAY * 4, TimeUnit.MILLISECONDS);
assertThat("Should finish quickly as we called startChecks after enough delay", finished3, equalTo(true));
backOff.close();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 13ff1be..506477c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -36,6 +36,8 @@ import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -85,7 +87,7 @@ public class JournalAvailableCheckerTest {
.when(provider).assertTopic(INVALID_TOPIC);
when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
.thenReturn(sreg);
- checker.activate(context);
+ checker.activate(configuration(emptyMap(), JournalCheckerConfiguration.class), context);
}
@After
@@ -122,17 +124,17 @@ public class JournalAvailableCheckerTest {
}
private void makeCheckSucceed() {
- topics.activate(topicsConfiguration(emptyMap()));
+ topics.activate(configuration(emptyMap(), TopicsConfiguration.class));
}
private void makeCheckFail() {
- topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
+ topics.activate(configuration(singletonMap("packageTopic", INVALID_TOPIC), TopicsConfiguration.class));
}
- private Topics.TopicsConfiguration topicsConfiguration(Map<String,String> props) {
+ private <T> T configuration(Map<String,String> props, Class<T> clazz) {
return standardConverter()
.convert(props)
- .to(Topics.TopicsConfiguration.class);
+ .to(clazz);
}
private static Event createErrorEvent(Exception e) {