You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/02/24 13:39:45 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9134 - Allow to configure initial and max journal checker delay upon retries

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new f79594a  SLING-9134 - Allow to configure initial and max journal checker delay upon retries
f79594a is described below

commit f79594aff0f1f157935f82ecc8850d599d6f8dfa
Author: tmaret <tm...@adobe.com>
AuthorDate: Mon Feb 24 14:39:24 2020 +0100

    SLING-9134 - Allow to configure initial and max journal checker delay upon retries
---
 .../journal/impl/shared/ExponentialBackOff.java    | 14 +++++---
 .../impl/shared/JournalAvailableChecker.java       | 39 ++++++++++++++--------
 .../impl/shared/ExponentialBackoffTest.java        | 16 ++++-----
 .../impl/shared/JournalAvailableCheckerTest.java   | 12 ++++---
 4 files changed, 50 insertions(+), 31 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
index 1338ef4..28f5aa8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -21,7 +21,6 @@ package org.apache.sling.distribution.journal.impl.shared;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.io.Closeable;
-import java.time.Duration;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
@@ -51,9 +50,16 @@ public class ExponentialBackOff implements Closeable {
     private long currentMaxDelay;
     private long lastCheck;
 
-    public ExponentialBackOff(Duration startDelay, Duration maxDelay, boolean randomDelay, Runnable checkCallback) {
-        this.startDelay = startDelay.toMillis();
-        this.maxDelay = maxDelay.toMillis();
+    /**
+     * @param startDelay the start delay in milliseconds
+     * @param maxDelay the max delay in milliseconds
+     * @param randomDelay {@code true} to randomise the delay between 0 and the current max delay ;
+     *                    {@code false} to use the current max delay.
+     * @param checkCallback the code to invoke when the current delay has elapsed
+     */
+    public ExponentialBackOff(long startDelay, long maxDelay, boolean randomDelay, Runnable checkCallback) {
+        this.startDelay = startDelay;
+        this.maxDelay = maxDelay;
         this.randomDelay = randomDelay;
         this.checkCallback = checkCallback;
         this.executor = Executors.newScheduledThreadPool(1);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 5dee552..feb03a5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -18,12 +18,8 @@
  */
 package org.apache.sling.distribution.journal.impl.shared;
 
-import static java.time.temporal.ChronoUnit.MILLIS;
-import static java.time.temporal.ChronoUnit.MINUTES;
 import static java.util.Objects.requireNonNull;
 
-import java.time.Duration;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -36,6 +32,9 @@ import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventConstants;
 import org.osgi.service.event.EventHandler;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,14 +43,16 @@ import org.slf4j.LoggerFactory;
         service = EventHandler.class,
         property = EventConstants.EVENT_TOPIC + "=" + ExceptionEventSender.ERROR_TOPIC
 )
+@Designate(ocd = JournalAvailableChecker.JournalCheckerConfiguration.class)
 public class JournalAvailableChecker implements EventHandler {
-    
-    private static final Duration INITIAL_RETRY_DELAY = Duration.of(500, MILLIS);
-    private static final Duration MAX_RETRY_DELAY = Duration.of(5, MINUTES);
+
+    public static final long INITIAL_RETRY_DELAY = 500; // 500 ms
+
+    public static final long MAX_RETRY_DELAY = 5 * 60 * 1000; // 5 minutes
 
     private static final Logger LOG = LoggerFactory.getLogger(JournalAvailableChecker.class);
     
-    private final ExponentialBackOff backoffRetry;
+    private ExponentialBackOff backoffRetry;
     
     @Reference
     Topics topics;
@@ -65,19 +66,16 @@ public class JournalAvailableChecker implements EventHandler {
     private JournalAvailableServiceMarker marker;
 
     private GaugeService<Boolean> gauge;
-
-    public JournalAvailableChecker() {
-        this.backoffRetry = new ExponentialBackOff(INITIAL_RETRY_DELAY, MAX_RETRY_DELAY, true, this::run);
-    }
     
     @Activate
-    public void activate(BundleContext context) {
+    public void activate(JournalCheckerConfiguration config, BundleContext context) {
         requireNonNull(provider);
         requireNonNull(topics);
+        this.backoffRetry = new ExponentialBackOff(config.initialRetryDelay(), config.maxRetryDelay(), true, this::run);
         this.marker = new JournalAvailableServiceMarker(context);
         this.gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
         this.marker.register();
-        LOG.info("Started Journal availability checker service. Journal is initially assumed available.");
+        LOG.info("Started Journal availability checker service with initialRetryDelay {}, maxRetryDelay {}. Journal is initially assumed available.", config.initialRetryDelay(), config.maxRetryDelay());
     }
 
     @Deactivate
@@ -133,4 +131,17 @@ public class JournalAvailableChecker implements EventHandler {
         }
     }
 
+    @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Journal Checker")
+    public @interface JournalCheckerConfiguration {
+
+        @AttributeDefinition(name = "Initial retry delay",
+                description = "The initial retry delay in milliseconds.")
+        long initialRetryDelay() default INITIAL_RETRY_DELAY;
+
+        @AttributeDefinition(name = "Max retry delay",
+                description = "The max retry delay in milliseconds.")
+        long maxRetryDelay() default MAX_RETRY_DELAY;
+
+    }
+
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
index 55e4c01..bd44057 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -18,11 +18,11 @@
  */
 package org.apache.sling.distribution.journal.impl.shared;
 
+import static java.time.Duration.of;
 import static java.time.temporal.ChronoUnit.MILLIS;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-import java.time.Duration;
 import java.time.temporal.ChronoUnit;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -36,9 +36,9 @@ import org.slf4j.LoggerFactory;
 @RunWith(MockitoJUnitRunner.class)
 public class ExponentialBackoffTest {
     private static final int RETRIES = 5;
-    private static final Duration INITIAL_DELAY = Duration.of(64, MILLIS);
-    private static final Duration MAX_DELAY = Duration.of(256, MILLIS);
-    private static final Duration LONG_DELAY = Duration.of(5, ChronoUnit.SECONDS);
+    private static final long INITIAL_DELAY = of(64, MILLIS).toMillis();
+    private static final long MAX_DELAY = of(256, MILLIS).toMillis();
+    private static final long LONG_DELAY = of(5, ChronoUnit.SECONDS).toMillis();
     
     private Logger log = LoggerFactory.getLogger(this.getClass());
     
@@ -52,21 +52,21 @@ public class ExponentialBackoffTest {
         backOff.startChecks();
         // Run into double trigger protection
         backOff.startChecks();
-        boolean finished = this.countDown.await(MAX_DELAY.toMillis() * RETRIES, TimeUnit.MILLISECONDS);
+        boolean finished = this.countDown.await(MAX_DELAY * RETRIES, TimeUnit.MILLISECONDS);
         assertThat("Should finish before the timeout", finished, equalTo(true));
 
         log.info("Checking for long delay if next error happens quickly");
         this.countDown = new CountDownLatch(1);
         backOff.startChecks();
-        boolean finished2 = this.countDown.await(INITIAL_DELAY.toMillis() * 2, TimeUnit.MILLISECONDS);
+        boolean finished2 = this.countDown.await(INITIAL_DELAY * 2, TimeUnit.MILLISECONDS);
         assertThat("Should not finish quickly as we called startChecks immediately", finished2, equalTo(false));
         this.countDown.await();
 
         log.info("Checking for short delay if next error happens after enough time");
-        Thread.sleep(MAX_DELAY.toMillis() * 3);
+        Thread.sleep(MAX_DELAY * 3);
         this.countDown = new CountDownLatch(1);
         backOff.startChecks();
-        boolean finished3 = this.countDown.await(INITIAL_DELAY.toMillis() * 4, TimeUnit.MILLISECONDS);
+        boolean finished3 = this.countDown.await(INITIAL_DELAY * 4, TimeUnit.MILLISECONDS);
         assertThat("Should finish quickly as we called startChecks after enough delay", finished3, equalTo(true));
 
         backOff.close();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 13ff1be..506477c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -36,6 +36,8 @@ import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -85,7 +87,7 @@ public class JournalAvailableCheckerTest {
                 .when(provider).assertTopic(INVALID_TOPIC);
         when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
                 .thenReturn(sreg);
-        checker.activate(context);
+        checker.activate(configuration(emptyMap(), JournalCheckerConfiguration.class), context);
     }
 
     @After
@@ -122,17 +124,17 @@ public class JournalAvailableCheckerTest {
     }
     
     private void makeCheckSucceed() {
-        topics.activate(topicsConfiguration(emptyMap()));
+        topics.activate(configuration(emptyMap(), TopicsConfiguration.class));
     }
 
     private void makeCheckFail() {
-        topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
+        topics.activate(configuration(singletonMap("packageTopic", INVALID_TOPIC), TopicsConfiguration.class));
     }
 
-    private Topics.TopicsConfiguration topicsConfiguration(Map<String,String> props) {
+    private <T> T configuration(Map<String,String> props, Class<T> clazz) {
         return standardConverter()
                 .convert(props)
-                .to(Topics.TopicsConfiguration.class);
+                .to(clazz);
     }
 
     private static Event createErrorEvent(Exception e) {