You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2021/07/12 18:21:53 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-10592 - Follow an exponential delay between import attempts
(#80)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 5f02736 SLING-10592 - Follow an exponential delay between import attempts (#80)
5f02736 is described below
commit 5f0273607ceb96e9d7ccd2e65869659b57a72c8d
Author: Timothee Maret <tm...@apache.org>
AuthorDate: Mon Jul 12 20:21:45 2021 +0200
SLING-10592 - Follow an exponential delay between import attempts (#80)
* Retry delay grows exponentially from 5 seconds to 15 minutes
* Delay is interrupted by command messages to avoid long delays when clearing items from distribution queues
* Simplify DistributionSubscriber#waitPrecondition
---
.../journal/impl/subscriber/CommandPoller.java | 5 +-
.../impl/subscriber/DistributionSubscriber.java | 49 +++++-----
.../sling/distribution/journal/shared/Delay.java | 50 ++++++++++
.../journal/impl/subscriber/CommandPollerTest.java | 12 ++-
.../distribution/journal/shared/DelayTest.java | 101 +++++++++++++++++++++
5 files changed, 194 insertions(+), 23 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index de8295d..22734a6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -40,11 +40,13 @@ public class CommandPoller implements Closeable {
private final Closeable poller;
private final IdleCheck idleCheck;
private final AtomicLong clearOffset = new AtomicLong(-1);
+ private final Runnable callback;
- public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, int idleMillies) {
+ public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, int idleMillies, Runnable callback) {
this.subSlingId = subSlingId;
this.subAgentName = subAgentName;
this.idleCheck = new SubscriberIdle(idleMillies, SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
+ this.callback = callback;
this.poller = messagingProvider.createPoller(
topics.getCommandTopic(),
Reset.earliest,
@@ -65,6 +67,7 @@ public class CommandPoller implements Closeable {
handleClearCommand(command);
idleCheck.idle();
+ callback.run();
}
private void handleClearCommand(ClearCommand command) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index d37d1c2..4d7d841 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,8 +20,12 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.shared.Delay.exponential;
import java.io.Closeable;
import java.io.IOException;
@@ -35,6 +39,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import javax.annotation.ParametersAreNonnullByDefault;
@@ -61,6 +67,7 @@ import org.apache.sling.distribution.journal.impl.precondition.Precondition.Deci
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.Delay;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -83,10 +90,13 @@ import org.slf4j.LoggerFactory;
@Designate(ocd = SubscriberConfiguration.class, factory = true)
@ParametersAreNonnullByDefault
public class DistributionSubscriber {
- private static final int PRECONDITION_TIMEOUT = 60;
- static int RETRY_DELAY = 5000;
- static int QUEUE_FETCH_DELAY = 1000;
- private static final long COMMAND_NOT_IDLE_DELAY_MS = 200;
+
+ private static final long PRECONDITION_TIMEOUT = SECONDS.toMillis(60);
+ static long RETRY_DELAY = SECONDS.toMillis(5);
+ static long MAX_RETRY_DELAY = MINUTES.toMillis(15);
+ static long QUEUE_FETCH_DELAY = SECONDS.toMillis(1);
+ private static final long COMMAND_NOT_IDLE_DELAY = MILLISECONDS.toMillis(200);
+ private static final Supplier<LongSupplier> catchAllDelays = () -> exponential(RETRY_DELAY, MAX_RETRY_DELAY);
private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
@@ -142,6 +152,10 @@ public class DistributionSubscriber {
private volatile boolean running = true;
private Thread queueThread;
+ private LongSupplier catchAllDelay = catchAllDelays.get();
+
+ private final Delay delay = new Delay();
+
@Activate
public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
String subSlingId = requireNonNull(slingSettings.getSlingId());
@@ -157,7 +171,7 @@ public class DistributionSubscriber {
Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
if (config.editable()) {
- commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, idleMillies);
+ commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, idleMillies, delay::signal);
}
if (config.subscriberIdleCheck()) {
@@ -293,19 +307,19 @@ public class DistributionSubscriber {
if (commandPoller == null || commandPoller.isIdle()) {
fetchAndProcessQueueItem();
} else {
- delay(COMMAND_NOT_IDLE_DELAY_MS);
+ delay.await(COMMAND_NOT_IDLE_DELAY);
}
} catch (PreConditionTimeoutException e) {
// Precondition timed out. We only log this on info level as it is no error
LOG.info(e.getMessage());
- delay(RETRY_DELAY);
+ delay.await(RETRY_DELAY);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug(e.getMessage());
} catch (Exception e) {
// Catch all to prevent processing from stopping
LOG.error("Error processing queue item", e);
- delay(RETRY_DELAY);
+ delay.await(catchAllDelay.getAsLong());
}
}
LOG.info("Stopped Queue processor");
@@ -318,6 +332,7 @@ public class DistributionSubscriber {
processQueueItem(item);
messageBuffer.remove();
distributionMetricsService.getItemsBufferSize().decrement();
+ catchAllDelay = catchAllDelays.get();
}
}
@@ -343,7 +358,7 @@ public class DistributionSubscriber {
if (message != null) {
return message;
} else {
- Thread.sleep(QUEUE_FETCH_DELAY);
+ delay.await(QUEUE_FETCH_DELAY);
}
}
throw new InterruptedException("Shutting down");
@@ -379,12 +394,11 @@ public class DistributionSubscriber {
private Decision waitPrecondition(long offset) {
- Decision decision = Precondition.Decision.WAIT;
- long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
- while (decision == Decision.WAIT && System.currentTimeMillis() < endTime && running) {
- decision = precondition.canProcess(subAgentName, offset);
+ long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT;
+ while (System.currentTimeMillis() < endTime && running) {
+ Decision decision = precondition.canProcess(subAgentName, offset);
if (decision == Decision.WAIT) {
- delay(100);
+ delay.await(100);
} else {
return decision;
}
@@ -392,11 +406,4 @@ public class DistributionSubscriber {
throw new PreConditionTimeoutException("Timeout waiting for distribution package at offset=" + offset + " on status topic");
}
- private static void delay(long delayInMs) {
- try {
- Thread.sleep(delayInMs);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/Delay.java b/src/main/java/org/apache/sling/distribution/journal/shared/Delay.java
new file mode 100644
index 0000000..e42d7ca
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Delay.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.function.LongSupplier;
+
+import static java.lang.Math.min;
+import static java.util.stream.LongStream.iterate;
+
+public final class Delay {
+
+ private final Object delayer = new Object();
+
+ public void await(long delayInMs) {
+ synchronized (delayer) {
+ try {
+ delayer.wait(delayInMs); //NOSONAR
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void signal() {
+ synchronized (delayer) {
+ delayer.notifyAll();
+ }
+ }
+
+ public static LongSupplier exponential(long startDelay, long maxDelay) {
+ return iterate(startDelay, delay -> min(2 * delay, maxDelay)).iterator()::next;
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 4816f14..ae6b310 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -60,6 +60,9 @@ public class CommandPollerTest {
@Mock
MessagingProvider clientProvider;
+
+ @Mock
+ Runnable callback;
CommandPoller commandPoller;
@@ -117,6 +120,13 @@ public class CommandPollerTest {
verify(poller).close();
}
+ @Test
+ public void testCallback() {
+ createCommandPoller();
+ commandHandler.handle(info, commandMessage(10L));
+ verify(callback, Mockito.times(1)).run();
+ }
+
private void assertClearedUpTo(int max) {
for (int c=0; c<=max; c++) {
assertThat(commandPoller.isCleared(c), equalTo(true));
@@ -147,7 +157,7 @@ public class CommandPollerTest {
Mockito.eq(Reset.earliest),
handlerCaptor.capture()))
.thenReturn(poller);
- commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME, 1000);
+ commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME, 1000, callback);
commandHandler = handlerCaptor.getValue().getHandler();
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DelayTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DelayTest.java
new file mode 100644
index 0000000..5467477
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DelayTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.LongSupplier;
+import java.util.stream.LongStream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DelayTest {
+
+ private static final long START_DELAY = 1L;
+
+ private static final long MAX_DELAY = 1000L;
+
+ private ScheduledExecutorService scheduler;
+
+ @Before
+ public void before() {
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @After
+ public void after() {
+ scheduler.shutdownNow();
+ }
+
+ @Test
+ public void testExponentialStartDelay() {
+ LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+ assertEquals(START_DELAY, delay.getAsLong());
+ }
+
+ @Test
+ public void testExponentialIncreasingDelay() {
+ LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+ assertTrue(delay.getAsLong() < delay.getAsLong());
+ }
+
+ @Test
+ public void testExponentialIncreasingRateDelay() {
+ LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+ assertEquals(1, delay.getAsLong());
+ assertEquals(2, delay.getAsLong());
+ assertEquals(4, delay.getAsLong());
+ }
+
+ @Test
+ public void testExponentialMaxDelay() {
+ LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+ long maxAfterHundredDelays = LongStream.generate(delay).limit(100).max().orElseThrow(IllegalStateException::new);
+ assertEquals(MAX_DELAY, maxAfterHundredDelays);
+ }
+
+ @Test(timeout = 15000)
+ public void testResumeDelay() throws Exception {
+ Delay delayer = new Delay();
+ CompletableFuture<Void> delayOp = runAsync(() -> delayer.await(HOURS.toMillis(1)));
+ scheduler.schedule(delayer::signal, 10, MILLISECONDS);
+ delayOp.get();
+ assertTrue(delayOp.isDone());
+ }
+
+ @Test(timeout = 15000)
+ public void testDelay() {
+ Delay delay = new Delay();
+ long duration = 100;
+ long start = System.nanoTime();
+ delay.await(duration);
+ long stop = System.nanoTime();
+ assertTrue((stop - start) >= duration);
+ }
+
+}
\ No newline at end of file