You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/08 14:20:34 UTC
[sling-org-apache-sling-distribution-journal] 01/01: SLING-8531 -
Exponential backoff for retries
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-8531
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit d95a933c0d62374b2c2f912f4751b90f63be8a84
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Jul 8 16:20:16 2019 +0200
SLING-8531 - Exponential backoff for retries
---
pom.xml | 8 ++-
.../journal/impl/shared/ExponentialBackOff.java | 75 ++++++++++++++++++++++
.../impl/shared/JournalAvailableChecker.java | 74 ++++++++++++++-------
.../impl/shared/ExponentialBackoffTest.java | 52 +++++++++++++++
.../impl/shared/JournalAvailableCheckerTest.java | 53 +++++++++++++--
5 files changed, 230 insertions(+), 32 deletions(-)
diff --git a/pom.xml b/pom.xml
index 60a109c..e204997 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.sling</groupId>
<artifactId>sling</artifactId>
- <version>34</version>
+ <version>35</version>
<relativePath />
</parent>
@@ -141,12 +141,14 @@
<!-- OSGi -->
<dependency>
<groupId>org.osgi</groupId>
- <artifactId>org.osgi.service.component.annotations</artifactId>
+ <artifactId>osgi.core</artifactId>
+ <version>6.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
- <artifactId>org.osgi.service.metatype.annotations</artifactId>
+ <artifactId>osgi.cmpn</artifactId>
+ <version>6.0.0</version>
<scope>provided</scope>
</dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
new file mode 100644
index 0000000..148bd2a
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retry with exponential backoff.
+ *
+ * Calls the checkCallback until it does not throw an Exception
+ */
+public class ExponentialBackOff implements Closeable {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private final ScheduledExecutorService executor;
+ private final Runnable checkCallback;
+
+ private Random random;
+ private int maxDelay;
+
+ private int currentMaxDelay;
+
+ public ExponentialBackOff(int maxDelay, Runnable checkCallback) {
+ this.currentMaxDelay = 128;
+ this.maxDelay = maxDelay;
+ this.checkCallback = checkCallback;
+ this.executor = Executors.newScheduledThreadPool(1);
+ this.random = new Random();
+ scheduleCheck();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.executor.shutdown();
+ }
+
+ private void scheduleCheck() {
+ this.currentMaxDelay = Math.min(this.currentMaxDelay *2, maxDelay);
+ int delay = this.random.nextInt(currentMaxDelay) + 1;
+ log.info("Scheduling next check in {} ms with maximum {} ms.", delay, currentMaxDelay);
+ this.executor.schedule(this::check, delay, TimeUnit.MILLISECONDS);
+ }
+
+ private void check() {
+ try {
+ this.checkCallback.run();
+ } catch (RuntimeException e) {
+ scheduleCheck();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 0d51329..dcb5fd9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -18,40 +18,40 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
-import javax.annotation.ParametersAreNonnullByDefault;
-
import static java.util.Objects.requireNonNull;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.JournalAvailable;
-
-@Component(
- service = {Runnable.class},
- immediate = true,
- property = {
- PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
- PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
- PROPERTY_SCHEDULER_PERIOD + ":Long=" + 90 // 90 seconds
- }
+@Component(
+ property = EventConstants.EVENT_TOPIC + "=" + ExceptionEventSender.ERROR_TOPIC
)
-@ParametersAreNonnullByDefault
-public class JournalAvailableChecker implements JournalAvailable, Runnable {
+public class JournalAvailableChecker implements JournalAvailable, EventHandler {
+
+ private static final int MAX_RETGRY_DELAY_MS = 10000;
+
+ // Minimal number of errors before journal is considered unavailable
+ public static final int MIN_ERRORS = 2;
private static final Logger LOG = LoggerFactory.getLogger(JournalAvailableChecker.class);
+
@Reference
Topics topics;
@@ -60,26 +60,33 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
@Reference
DistributionMetricsService metrics;
-
+
private BundleContext context;
private volatile ServiceRegistration<JournalAvailable> reg;
private GaugeService<Boolean> gauge;
+ private ExponentialBackOff backoffRetry;
+
+ private AtomicInteger numErrors;
+
@Activate
public void activate(BundleContext context) {
requireNonNull(provider);
requireNonNull(topics);
this.context = context;
+ this.numErrors = new AtomicInteger();
gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
LOG.info("Started Journal availability checker service");
+ startChecks();
}
@Deactivate
public void deactivate() {
gauge.close();
unRegister();
+ IOUtils.closeQuietly(this.backoffRetry);
LOG.info("Stopped Journal availability checker service");
}
@@ -91,17 +98,20 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
}
private void available() {
+ this.backoffRetry = null;
if (this.reg == null) {
+ IOUtils.closeQuietly(this.backoffRetry);
LOG.info("Journal is available");
this.reg = context.registerService(JournalAvailable.class, this, null);
}
}
private void unAvailable(Exception e) {
+ String msg = "Journal is still unavailable: " + e.getMessage();
if (LOG.isDebugEnabled()) {
- LOG.warn("Journal is unavailable " + e.getMessage(), e);
+ LOG.warn(msg, e);
} else {
- LOG.warn("Journal is unavailable " + e.getMessage());
+ LOG.warn(msg);
}
unRegister();
}
@@ -110,7 +120,6 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
return reg != null;
}
- @Override
public void run() {
try {
LOG.debug("Journal checker is running");
@@ -118,6 +127,7 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
available();
} catch (Exception e) {
unAvailable(e);
+ throw e;
}
}
@@ -127,4 +137,22 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
this.reg = null;
}
}
+
+ @Override
+ public synchronized void handleEvent(Event event) {
+ String type = (String) event.getProperty(ExceptionEventSender.KEY_TYPE);
+ int curNumErrors = this.numErrors.incrementAndGet();
+ if (this.backoffRetry == null && curNumErrors >= MIN_ERRORS) {
+ LOG.warn("Received exception event {}. Journal is considered unavailable.", type);
+ unRegister();
+ startChecks();
+ } else {
+ LOG.info("Received exception event {}. {} of {} errors occured.", type, this.numErrors.get(), MIN_ERRORS);
+ }
+ }
+
+ private void startChecks() {
+ this.backoffRetry = new ExponentialBackOff(MAX_RETGRY_DELAY_MS, this::run);
+ this.numErrors.set(0);
+ }
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
new file mode 100644
index 0000000..01bcec5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ExponentialBackoffTest {
+
+ private static final int RETRIES = 10;
+ private static final int MAX_DELAY_MS = 256;
+ private CountDownLatch countDown = new CountDownLatch(RETRIES);
+
+ @Test
+ public void testIsAvailable() throws Exception {
+ ExponentialBackOff backOff = new ExponentialBackOff(MAX_DELAY_MS, this::checkCallback);
+ boolean finished = this.countDown.await(MAX_DELAY_MS * RETRIES, TimeUnit.MILLISECONDS);
+ backOff.close();
+ assertThat("Should finish before the timeout", finished, equalTo(true));
+ }
+
+ private void checkCallback() {
+ this.countDown.countDown();
+ if (countDown.getCount() > 0) {
+ throw new RuntimeException("Failing num: " + this.countDown.getCount());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 622013a..fb407a2 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -20,19 +20,24 @@ package org.apache.sling.distribution.journal.impl.shared;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.osgi.util.converter.Converters.standardConverter;
+import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,6 +48,7 @@ import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
@RunWith(MockitoJUnitRunner.class)
public class JournalAvailableCheckerTest {
@@ -51,7 +57,7 @@ public class JournalAvailableCheckerTest {
@InjectMocks
private JournalAvailableChecker checker;
-
+
@Spy
private Topics topics = new Topics();
@@ -65,7 +71,7 @@ public class JournalAvailableCheckerTest {
private BundleContext context;
@Mock
- private ServiceRegistration<JournalAvailable> reg;
+ private ServiceRegistration<JournalAvailable> sreg;
@SuppressWarnings("rawtypes")
@Mock
@@ -78,7 +84,7 @@ public class JournalAvailableCheckerTest {
doThrow(new MessagingException("topic is invalid"))
.when(provider).assertTopic(INVALID_TOPIC);
when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
- .thenReturn(reg);
+ .thenReturn(sreg);
checker.activate(context);
}
@@ -89,18 +95,53 @@ public class JournalAvailableCheckerTest {
@Test
public void testIsAvailable() throws Exception {
- topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
- checker.run();
+ makeCheckFail();
+ try {
+ checker.run();
+ Assert.fail("Should throw exception");
+ } catch (Exception e) {
+ }
assertFalse(checker.isAvailable());
- topics.activate(topicsConfiguration(emptyMap()));
+ makeCheckSucceed();
checker.run();
assertTrue(checker.isAvailable());
}
+ @Test
+ public void testActivateChecksOnEvent() {
+ await("At the start checks are triggers and should set the state available")
+ .until(checker::isAvailable);
+
+ makeCheckFail();
+ Event event = createErrorEvent(new IOException("Expected"));
+ checker.handleEvent(event);
+ await().until(checker::isAvailable);
+ // Signal second exception to checker to start the checks. Now we should see not available
+ checker.handleEvent(event);
+ await().until(() -> !checker.isAvailable());
+
+ makeCheckSucceed();
+ await().until(checker::isAvailable);
+ }
+
+ private void makeCheckSucceed() {
+ topics.activate(topicsConfiguration(emptyMap()));
+ }
+
+ private void makeCheckFail() {
+ topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC)));
+ }
+
private Topics.TopicsConfiguration topicsConfiguration(Map<String,String> props) {
return standardConverter()
.convert(props)
.to(Topics.TopicsConfiguration.class);
}
+ private static Event createErrorEvent(Exception e) {
+ Map<String, String> props = new HashMap<>();
+ props.put(ExceptionEventSender.KEY_TYPE, e.getClass().getName());
+ props.put(ExceptionEventSender.KEY_MESSAGE, e.getMessage());
+ return new Event(ExceptionEventSender.ERROR_TOPIC, props);
+ }
}
\ No newline at end of file