You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/08 14:20:34 UTC

[sling-org-apache-sling-distribution-journal] 01/01: SLING-8531 - Exponential backoff for retries

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-8531
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit d95a933c0d62374b2c2f912f4751b90f63be8a84
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Jul 8 16:20:16 2019 +0200

    SLING-8531 - Exponential backoff for retries
---
 pom.xml                                            |  8 ++-
 .../journal/impl/shared/ExponentialBackOff.java    | 75 ++++++++++++++++++++++
 .../impl/shared/JournalAvailableChecker.java       | 74 ++++++++++++++-------
 .../impl/shared/ExponentialBackoffTest.java        | 52 +++++++++++++++
 .../impl/shared/JournalAvailableCheckerTest.java   | 53 +++++++++++++--
 5 files changed, 230 insertions(+), 32 deletions(-)

diff --git a/pom.xml b/pom.xml
index 60a109c..e204997 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.sling</groupId>
         <artifactId>sling</artifactId>
-        <version>34</version>
+        <version>35</version>
         <relativePath />
     </parent>
 
@@ -141,12 +141,14 @@
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
-            <artifactId>org.osgi.service.component.annotations</artifactId>
+            <artifactId>osgi.core</artifactId>
+            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
-            <artifactId>org.osgi.service.metatype.annotations</artifactId>
+            <artifactId>osgi.cmpn</artifactId>
+            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
new file mode 100644
index 0000000..148bd2a
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retry with exponential backoff.
+ * 
+ * Calls the checkCallback until it does not throw an Exception
+ */
+public class ExponentialBackOff implements Closeable {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private final ScheduledExecutorService executor;
+    private final Runnable checkCallback;
+    
+    private Random random;
+    private int maxDelay;
+
+    private int currentMaxDelay;
+    
+    public ExponentialBackOff(int maxDelay, Runnable checkCallback) {
+        this.currentMaxDelay = 128;
+        this.maxDelay = maxDelay;
+        this.checkCallback = checkCallback;
+        this.executor = Executors.newScheduledThreadPool(1);
+        this.random = new Random();
+        scheduleCheck();
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.executor.shutdown();
+    }
+
+    private void scheduleCheck() {
+        this.currentMaxDelay = Math.min(this.currentMaxDelay *2, maxDelay);
+        int delay = this.random.nextInt(currentMaxDelay) + 1;
+        log.info("Scheduling next check in {} ms with maximum {} ms.", delay, currentMaxDelay);
+        this.executor.schedule(this::check, delay, TimeUnit.MILLISECONDS);
+    }
+
+    private void check() {
+        try {
+            this.checkCallback.run();
+        } catch (RuntimeException e) {
+            scheduleCheck();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 0d51329..dcb5fd9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -18,40 +18,40 @@
  */
 package org.apache.sling.distribution.journal.impl.shared;
 
-import javax.annotation.ParametersAreNonnullByDefault;
-
 import static java.util.Objects.requireNonNull;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.JournalAvailable;
-
-@Component(
-        service = {Runnable.class},
-        immediate = true,
-        property = {
-                PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
-                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
-                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 90  // 90 seconds
-        }
+@Component( 
+        property = EventConstants.EVENT_TOPIC + "=" + ExceptionEventSender.ERROR_TOPIC
 )
-@ParametersAreNonnullByDefault
-public class JournalAvailableChecker implements JournalAvailable, Runnable {
+public class JournalAvailableChecker implements JournalAvailable, EventHandler {
+    
+    private static final int MAX_RETGRY_DELAY_MS = 10000;
+
+    // Minimal number of errors before journal is considered unavailable
+    public static final int MIN_ERRORS = 2;
 
     private static final Logger LOG = LoggerFactory.getLogger(JournalAvailableChecker.class);
 
+
     @Reference
     Topics topics;
     
@@ -60,26 +60,33 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
     
     @Reference
     DistributionMetricsService metrics;
-
+    
     private BundleContext context;
 
     private volatile ServiceRegistration<JournalAvailable> reg;
 
     private GaugeService<Boolean> gauge;
 
+    private ExponentialBackOff backoffRetry;
+    
+    private AtomicInteger numErrors;
+    
     @Activate
     public void activate(BundleContext context) {
         requireNonNull(provider);
         requireNonNull(topics);
         this.context = context;
+        this.numErrors = new AtomicInteger();
         gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
         LOG.info("Started Journal availability checker service");
+        startChecks();
     }
 
     @Deactivate
     public void deactivate() {
         gauge.close();
         unRegister();
+        IOUtils.closeQuietly(this.backoffRetry);
         LOG.info("Stopped Journal availability checker service");
     }
 
@@ -91,17 +98,20 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
     }
 
     private void available() {
+        this.backoffRetry = null;
         if (this.reg == null) {
+            IOUtils.closeQuietly(this.backoffRetry);
             LOG.info("Journal is available");
             this.reg = context.registerService(JournalAvailable.class, this, null);
         }
     }
     
     private void unAvailable(Exception e) {
+        String msg = "Journal is still unavailable: " + e.getMessage();
         if (LOG.isDebugEnabled()) {
-            LOG.warn("Journal is unavailable " + e.getMessage(), e);
+            LOG.warn(msg, e);
         } else {
-            LOG.warn("Journal is unavailable " + e.getMessage());
+            LOG.warn(msg);
         }
         unRegister();
     }
@@ -110,7 +120,6 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
         return reg != null;
     }
 
-    @Override
     public void run() {
         try {
             LOG.debug("Journal checker is running");
@@ -118,6 +127,7 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
             available();
         } catch (Exception e) {
             unAvailable(e);
+            throw e;
         }
     }
 
@@ -127,4 +137,22 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
             this.reg = null;
         }
     }
+
+    @Override
+    public synchronized void handleEvent(Event event) {
+        String type = (String) event.getProperty(ExceptionEventSender.KEY_TYPE);
+        int curNumErrors = this.numErrors.incrementAndGet();
+        if (this.backoffRetry == null && curNumErrors >= MIN_ERRORS) {
+            LOG.warn("Received exception event {}. Journal is considered unavailable.", type);
+            unRegister();
+            startChecks(); 
+        } else {
+            LOG.info("Received exception event {}. {} of {} errors occured.", type, this.numErrors.get(), MIN_ERRORS);
+        }
+    }
+
+    private void startChecks() {
+        this.backoffRetry = new ExponentialBackOff(MAX_RETGRY_DELAY_MS, this::run);
+        this.numErrors.set(0);
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
new file mode 100644
index 0000000..01bcec5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ExponentialBackoffTest {
+
+    private static final int RETRIES = 10;
+    private static final int MAX_DELAY_MS = 256;
+    private CountDownLatch countDown = new CountDownLatch(RETRIES);
+    
+    @Test
+    public void testIsAvailable() throws Exception {
+        ExponentialBackOff backOff = new ExponentialBackOff(MAX_DELAY_MS, this::checkCallback);
+        boolean finished = this.countDown.await(MAX_DELAY_MS * RETRIES, TimeUnit.MILLISECONDS);
+        backOff.close();
+        assertThat("Should finish before the timeout", finished, equalTo(true));
+    }
+    
+    private void checkCallback() {
+        this.countDown.countDown();
+        if (countDown.getCount() > 0) {
+            throw new RuntimeException("Failing num: " + this.countDown.getCount());
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 622013a..fb407a2 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -20,19 +20,24 @@ package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 import static org.osgi.util.converter.Converters.standardConverter;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +48,7 @@ import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
 
 @RunWith(MockitoJUnitRunner.class)
 public class JournalAvailableCheckerTest {
@@ -51,7 +57,7 @@ public class JournalAvailableCheckerTest {
 
     @InjectMocks
     private JournalAvailableChecker checker;
-
+    
     @Spy
     private Topics topics = new Topics();
 
@@ -65,7 +71,7 @@ public class JournalAvailableCheckerTest {
     private BundleContext context;
 
     @Mock
-    private ServiceRegistration<JournalAvailable> reg;
+    private ServiceRegistration<JournalAvailable> sreg;
 
     @SuppressWarnings("rawtypes")
     @Mock
@@ -78,7 +84,7 @@ public class JournalAvailableCheckerTest {
         doThrow(new MessagingException("topic is invalid"))
                 .when(provider).assertTopic(INVALID_TOPIC);
         when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
-                .thenReturn(reg);
+                .thenReturn(sreg);
         checker.activate(context);
     }
 
@@ -89,18 +95,53 @@ public class JournalAvailableCheckerTest {
 
     @Test
     public void testIsAvailable() throws Exception {
-        topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
-        checker.run();
+        makeCheckFail();
+        try {
+            checker.run();
+            Assert.fail("Should throw exception");
+        } catch (Exception e) {
+        }
         assertFalse(checker.isAvailable());
-        topics.activate(topicsConfiguration(emptyMap()));
+        makeCheckSucceed();
         checker.run();
         assertTrue(checker.isAvailable());
     }
 
+    @Test
+    public void testActivateChecksOnEvent() {
+        await("At the start checks are triggers and should set the state available")
+            .until(checker::isAvailable);
+        
+        makeCheckFail();
+        Event event = createErrorEvent(new IOException("Expected"));
+        checker.handleEvent(event);
+        await().until(checker::isAvailable);
+        // Signal second exception to checker to start the checks. Now we should see not available
+        checker.handleEvent(event);
+        await().until(() -> !checker.isAvailable());
+        
+        makeCheckSucceed();
+        await().until(checker::isAvailable);
+    }
+    
+    private void makeCheckSucceed() {
+        topics.activate(topicsConfiguration(emptyMap()));
+    }
+
+    private void makeCheckFail() {
+        topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
+    }
+
     private Topics.TopicsConfiguration topicsConfiguration(Map<String,String> props) {
         return standardConverter()
                 .convert(props)
                 .to(Topics.TopicsConfiguration.class);
     }
 
+    private static Event createErrorEvent(Exception e) {
+        Map<String, String> props = new HashMap<>();
+        props.put(ExceptionEventSender.KEY_TYPE, e.getClass().getName());
+        props.put(ExceptionEventSender.KEY_MESSAGE, e.getMessage());
+        return new Event(ExceptionEventSender.ERROR_TOPIC, props);
+    }
 }
\ No newline at end of file