You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/08 14:20:33 UTC

[sling-org-apache-sling-distribution-journal] branch SLING-8531 created (now d95a933)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch SLING-8531
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


      at d95a933  SLING-8531 - Exponential backoff for retries

This branch includes the following new commits:

     new d95a933  SLING-8531 - Exponential backoff for retries

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[sling-org-apache-sling-distribution-journal] 01/01: SLING-8531 - Exponential backoff for retries

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-8531
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit d95a933c0d62374b2c2f912f4751b90f63be8a84
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Jul 8 16:20:16 2019 +0200

    SLING-8531 - Exponential backoff for retries
---
 pom.xml                                            |  8 ++-
 .../journal/impl/shared/ExponentialBackOff.java    | 75 ++++++++++++++++++++++
 .../impl/shared/JournalAvailableChecker.java       | 74 ++++++++++++++-------
 .../impl/shared/ExponentialBackoffTest.java        | 52 +++++++++++++++
 .../impl/shared/JournalAvailableCheckerTest.java   | 53 +++++++++++++--
 5 files changed, 230 insertions(+), 32 deletions(-)

diff --git a/pom.xml b/pom.xml
index 60a109c..e204997 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.sling</groupId>
         <artifactId>sling</artifactId>
-        <version>34</version>
+        <version>35</version>
         <relativePath />
     </parent>
 
@@ -141,12 +141,14 @@
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
-            <artifactId>org.osgi.service.component.annotations</artifactId>
+            <artifactId>osgi.core</artifactId>
+            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
-            <artifactId>org.osgi.service.metatype.annotations</artifactId>
+            <artifactId>osgi.cmpn</artifactId>
+            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
new file mode 100644
index 0000000..148bd2a
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retry with exponential backoff.
+ * 
+ * Calls the checkCallback until it does not throw an Exception
+ */
+public class ExponentialBackOff implements Closeable {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private final ScheduledExecutorService executor;
+    private final Runnable checkCallback;
+    
+    private Random random;
+    private int maxDelay;
+
+    private int currentMaxDelay;
+    
+    public ExponentialBackOff(int maxDelay, Runnable checkCallback) {
+        this.currentMaxDelay = 128;
+        this.maxDelay = maxDelay;
+        this.checkCallback = checkCallback;
+        this.executor = Executors.newScheduledThreadPool(1);
+        this.random = new Random();
+        scheduleCheck();
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.executor.shutdown();
+    }
+
+    private void scheduleCheck() {
+        this.currentMaxDelay = Math.min(this.currentMaxDelay *2, maxDelay);
+        int delay = this.random.nextInt(currentMaxDelay) + 1;
+        log.info("Scheduling next check in {} ms with maximum {} ms.", delay, currentMaxDelay);
+        this.executor.schedule(this::check, delay, TimeUnit.MILLISECONDS);
+    }
+
+    private void check() {
+        try {
+            this.checkCallback.run();
+        } catch (RuntimeException e) {
+            scheduleCheck();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 0d51329..dcb5fd9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -18,40 +18,40 @@
  */
 package org.apache.sling.distribution.journal.impl.shared;
 
-import javax.annotation.ParametersAreNonnullByDefault;
-
 import static java.util.Objects.requireNonNull;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.JournalAvailable;
-
-@Component(
-        service = {Runnable.class},
-        immediate = true,
-        property = {
-                PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
-                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
-                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 90  // 90 seconds
-        }
+@Component( 
+        property = EventConstants.EVENT_TOPIC + "=" + ExceptionEventSender.ERROR_TOPIC
 )
-@ParametersAreNonnullByDefault
-public class JournalAvailableChecker implements JournalAvailable, Runnable {
+public class JournalAvailableChecker implements JournalAvailable, EventHandler {
+    
+    private static final int MAX_RETGRY_DELAY_MS = 10000;
+
+    // Minimal number of errors before journal is considered unavailable
+    public static final int MIN_ERRORS = 2;
 
     private static final Logger LOG = LoggerFactory.getLogger(JournalAvailableChecker.class);
 
+
     @Reference
     Topics topics;
     
@@ -60,26 +60,33 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
     
     @Reference
     DistributionMetricsService metrics;
-
+    
     private BundleContext context;
 
     private volatile ServiceRegistration<JournalAvailable> reg;
 
     private GaugeService<Boolean> gauge;
 
+    private ExponentialBackOff backoffRetry;
+    
+    private AtomicInteger numErrors;
+    
     @Activate
     public void activate(BundleContext context) {
         requireNonNull(provider);
         requireNonNull(topics);
         this.context = context;
+        this.numErrors = new AtomicInteger();
         gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
         LOG.info("Started Journal availability checker service");
+        startChecks();
     }
 
     @Deactivate
     public void deactivate() {
         gauge.close();
         unRegister();
+        IOUtils.closeQuietly(this.backoffRetry);
         LOG.info("Stopped Journal availability checker service");
     }
 
@@ -91,17 +98,20 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
     }
 
     private void available() {
+        this.backoffRetry = null;
         if (this.reg == null) {
+            IOUtils.closeQuietly(this.backoffRetry);
             LOG.info("Journal is available");
             this.reg = context.registerService(JournalAvailable.class, this, null);
         }
     }
     
     private void unAvailable(Exception e) {
+        String msg = "Journal is still unavailable: " + e.getMessage();
         if (LOG.isDebugEnabled()) {
-            LOG.warn("Journal is unavailable " + e.getMessage(), e);
+            LOG.warn(msg, e);
         } else {
-            LOG.warn("Journal is unavailable " + e.getMessage());
+            LOG.warn(msg);
         }
         unRegister();
     }
@@ -110,7 +120,6 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
         return reg != null;
     }
 
-    @Override
     public void run() {
         try {
             LOG.debug("Journal checker is running");
@@ -118,6 +127,7 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
             available();
         } catch (Exception e) {
             unAvailable(e);
+            throw e;
         }
     }
 
@@ -127,4 +137,22 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
             this.reg = null;
         }
     }
+
+    @Override
+    public synchronized void handleEvent(Event event) {
+        String type = (String) event.getProperty(ExceptionEventSender.KEY_TYPE);
+        int curNumErrors = this.numErrors.incrementAndGet();
+        if (this.backoffRetry == null && curNumErrors >= MIN_ERRORS) {
+            LOG.warn("Received exception event {}. Journal is considered unavailable.", type);
+            unRegister();
+            startChecks(); 
+        } else {
+            LOG.info("Received exception event {}. {} of {} errors occured.", type, this.numErrors.get(), MIN_ERRORS);
+        }
+    }
+
+    private void startChecks() {
+        this.backoffRetry = new ExponentialBackOff(MAX_RETGRY_DELAY_MS, this::run);
+        this.numErrors.set(0);
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
new file mode 100644
index 0000000..01bcec5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ExponentialBackoffTest {
+
+    private static final int RETRIES = 10;
+    private static final int MAX_DELAY_MS = 256;
+    private CountDownLatch countDown = new CountDownLatch(RETRIES);
+    
+    @Test
+    public void testIsAvailable() throws Exception {
+        ExponentialBackOff backOff = new ExponentialBackOff(MAX_DELAY_MS, this::checkCallback);
+        boolean finished = this.countDown.await(MAX_DELAY_MS * RETRIES, TimeUnit.MILLISECONDS);
+        backOff.close();
+        assertThat("Should finish before the timeout", finished, equalTo(true));
+    }
+    
+    private void checkCallback() {
+        this.countDown.countDown();
+        if (countDown.getCount() > 0) {
+            throw new RuntimeException("Failing num: " + this.countDown.getCount());
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 622013a..fb407a2 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -20,19 +20,24 @@ package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 import static org.osgi.util.converter.Converters.standardConverter;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +48,7 @@ import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
 
 @RunWith(MockitoJUnitRunner.class)
 public class JournalAvailableCheckerTest {
@@ -51,7 +57,7 @@ public class JournalAvailableCheckerTest {
 
     @InjectMocks
     private JournalAvailableChecker checker;
-
+    
     @Spy
     private Topics topics = new Topics();
 
@@ -65,7 +71,7 @@ public class JournalAvailableCheckerTest {
     private BundleContext context;
 
     @Mock
-    private ServiceRegistration<JournalAvailable> reg;
+    private ServiceRegistration<JournalAvailable> sreg;
 
     @SuppressWarnings("rawtypes")
     @Mock
@@ -78,7 +84,7 @@ public class JournalAvailableCheckerTest {
         doThrow(new MessagingException("topic is invalid"))
                 .when(provider).assertTopic(INVALID_TOPIC);
         when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
-                .thenReturn(reg);
+                .thenReturn(sreg);
         checker.activate(context);
     }
 
@@ -89,18 +95,53 @@ public class JournalAvailableCheckerTest {
 
     @Test
     public void testIsAvailable() throws Exception {
-        topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
-        checker.run();
+        makeCheckFail();
+        try {
+            checker.run();
+            Assert.fail("Should throw exception");
+        } catch (Exception e) {
+        }
         assertFalse(checker.isAvailable());
-        topics.activate(topicsConfiguration(emptyMap()));
+        makeCheckSucceed();
         checker.run();
         assertTrue(checker.isAvailable());
     }
 
+    @Test
+    public void testActivateChecksOnEvent() {
+        await("At the start checks are triggers and should set the state available")
+            .until(checker::isAvailable);
+        
+        makeCheckFail();
+        Event event = createErrorEvent(new IOException("Expected"));
+        checker.handleEvent(event);
+        await().until(checker::isAvailable);
+        // Signal second exception to checker to start the checks. Now we should see not available
+        checker.handleEvent(event);
+        await().until(() -> !checker.isAvailable());
+        
+        makeCheckSucceed();
+        await().until(checker::isAvailable);
+    }
+    
+    private void makeCheckSucceed() {
+        topics.activate(topicsConfiguration(emptyMap()));
+    }
+
+    private void makeCheckFail() {
+        topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
+    }
+
     private Topics.TopicsConfiguration topicsConfiguration(Map<String,String> props) {
         return standardConverter()
                 .convert(props)
                 .to(Topics.TopicsConfiguration.class);
     }
 
+    private static Event createErrorEvent(Exception e) {
+        Map<String, String> props = new HashMap<>();
+        props.put(ExceptionEventSender.KEY_TYPE, e.getClass().getName());
+        props.put(ExceptionEventSender.KEY_MESSAGE, e.getMessage());
+        return new Event(ExceptionEventSender.ERROR_TOPIC, props);
+    }
 }
\ No newline at end of file