You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2021/07/12 18:21:53 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-10592 - Follow an exponential delay between import attempts (#80)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f02736  SLING-10592 - Follow an exponential delay between import attempts (#80)
5f02736 is described below

commit 5f0273607ceb96e9d7ccd2e65869659b57a72c8d
Author: Timothee Maret <tm...@apache.org>
AuthorDate: Mon Jul 12 20:21:45 2021 +0200

    SLING-10592 - Follow an exponential delay between import attempts (#80)
    
    * Retry delay grows exponentially from 5 seconds to 15 minutes
    * Delay is interrupted by command messages to avoid long delays when clearing items from distribution queues
    * Simplify DistributionSubscriber#waitPrecondition
---
 .../journal/impl/subscriber/CommandPoller.java     |   5 +-
 .../impl/subscriber/DistributionSubscriber.java    |  49 +++++-----
 .../sling/distribution/journal/shared/Delay.java   |  50 ++++++++++
 .../journal/impl/subscriber/CommandPollerTest.java |  12 ++-
 .../distribution/journal/shared/DelayTest.java     | 101 +++++++++++++++++++++
 5 files changed, 194 insertions(+), 23 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index de8295d..22734a6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -40,11 +40,13 @@ public class CommandPoller implements Closeable {
     private final Closeable poller;
     private final IdleCheck idleCheck;
     private final AtomicLong clearOffset = new AtomicLong(-1);
+    private final Runnable callback;
 
-    public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, int idleMillies) {
+    public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, int idleMillies, Runnable callback) {
         this.subSlingId = subSlingId;
         this.subAgentName = subAgentName;
         this.idleCheck = new SubscriberIdle(idleMillies, SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
+        this.callback = callback;
         this.poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
                     Reset.earliest,
@@ -65,6 +67,7 @@ public class CommandPoller implements Closeable {
 
         handleClearCommand(command);
         idleCheck.idle();
+        callback.run();
     }
 
     private void handleClearCommand(ClearCommand command) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index d37d1c2..4d7d841 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,8 +20,12 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.shared.Delay.exponential;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -35,6 +39,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
@@ -61,6 +67,7 @@ import org.apache.sling.distribution.journal.impl.precondition.Precondition.Deci
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.Delay;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -83,10 +90,13 @@ import org.slf4j.LoggerFactory;
 @Designate(ocd = SubscriberConfiguration.class, factory = true)
 @ParametersAreNonnullByDefault
 public class DistributionSubscriber {
-    private static final int PRECONDITION_TIMEOUT = 60;
-    static int RETRY_DELAY = 5000;
-    static int QUEUE_FETCH_DELAY = 1000;
-    private static final long COMMAND_NOT_IDLE_DELAY_MS = 200;
+
+    private static final long PRECONDITION_TIMEOUT = SECONDS.toMillis(60);
+    static long RETRY_DELAY = SECONDS.toMillis(5);
+    static long MAX_RETRY_DELAY = MINUTES.toMillis(15);
+    static long QUEUE_FETCH_DELAY = SECONDS.toMillis(1);
+    private static final long COMMAND_NOT_IDLE_DELAY = MILLISECONDS.toMillis(200);
+    private static final Supplier<LongSupplier> catchAllDelays = () -> exponential(RETRY_DELAY, MAX_RETRY_DELAY);
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
 
@@ -142,6 +152,10 @@ public class DistributionSubscriber {
     private volatile boolean running = true;
     private Thread queueThread;
 
+    private LongSupplier catchAllDelay = catchAllDelays.get();
+
+    private final Delay delay = new Delay();
+
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
         String subSlingId = requireNonNull(slingSettings.getSlingId());
@@ -157,7 +171,7 @@ public class DistributionSubscriber {
 
         Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
         if (config.editable()) {
-            commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, idleMillies);
+            commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, idleMillies, delay::signal);
         }
 
         if (config.subscriberIdleCheck()) {
@@ -293,19 +307,19 @@ public class DistributionSubscriber {
                 if (commandPoller == null || commandPoller.isIdle()) {
                     fetchAndProcessQueueItem();
                 } else {
-                    delay(COMMAND_NOT_IDLE_DELAY_MS);
+                    delay.await(COMMAND_NOT_IDLE_DELAY);
                 }
             } catch (PreConditionTimeoutException e) {
                 // Precondition timed out. We only log this on info level as it is no error
                 LOG.info(e.getMessage());
-                delay(RETRY_DELAY);
+                delay.await(RETRY_DELAY);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 LOG.debug(e.getMessage());
             } catch (Exception e) {
                 // Catch all to prevent processing from stopping
                 LOG.error("Error processing queue item", e);
-                delay(RETRY_DELAY);
+                delay.await(catchAllDelay.getAsLong());
             }
         }
         LOG.info("Stopped Queue processor");
@@ -318,6 +332,7 @@ public class DistributionSubscriber {
             processQueueItem(item);
             messageBuffer.remove();
             distributionMetricsService.getItemsBufferSize().decrement();
+            catchAllDelay = catchAllDelays.get();
         }
     }
 
@@ -343,7 +358,7 @@ public class DistributionSubscriber {
             if (message != null) {
                 return message;
             } else {
-                Thread.sleep(QUEUE_FETCH_DELAY);
+                delay.await(QUEUE_FETCH_DELAY);
             }
         }
         throw new InterruptedException("Shutting down");
@@ -379,12 +394,11 @@ public class DistributionSubscriber {
 
 
     private Decision waitPrecondition(long offset) {
-        Decision decision = Precondition.Decision.WAIT;
-        long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
-        while (decision == Decision.WAIT && System.currentTimeMillis() < endTime && running) {
-            decision = precondition.canProcess(subAgentName, offset);
+        long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT;
+        while (System.currentTimeMillis() < endTime && running) {
+            Decision decision = precondition.canProcess(subAgentName, offset);
             if (decision == Decision.WAIT) {
-                delay(100);
+                delay.await(100);
             } else {
                 return decision;
             }
@@ -392,11 +406,4 @@ public class DistributionSubscriber {
         throw new PreConditionTimeoutException("Timeout waiting for distribution package at offset=" + offset + " on status topic");
     }
 
-    private static void delay(long delayInMs) {
-        try {
-            Thread.sleep(delayInMs);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/Delay.java b/src/main/java/org/apache/sling/distribution/journal/shared/Delay.java
new file mode 100644
index 0000000..e42d7ca
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Delay.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.function.LongSupplier;
+
+import static java.lang.Math.min;
+import static java.util.stream.LongStream.iterate;
+
+public final class Delay {
+
+    private final Object delayer = new Object();
+
+    public void await(long delayInMs) {
+        synchronized (delayer) {
+            try {
+                delayer.wait(delayInMs); //NOSONAR
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public void signal() {
+        synchronized (delayer) {
+            delayer.notifyAll();
+        }
+    }
+
+    public static LongSupplier exponential(long startDelay, long maxDelay) {
+        return iterate(startDelay, delay -> min(2 * delay, maxDelay)).iterator()::next;
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 4816f14..ae6b310 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -60,6 +60,9 @@ public class CommandPollerTest {
     
     @Mock
     MessagingProvider clientProvider;
+
+    @Mock
+    Runnable callback;
     
     CommandPoller commandPoller;
     
@@ -117,6 +120,13 @@ public class CommandPollerTest {
         verify(poller).close();
     }
 
+    @Test
+    public void testCallback() {
+        createCommandPoller();
+        commandHandler.handle(info, commandMessage(10L));
+        verify(callback, Mockito.times(1)).run();
+    }
+
     private void assertClearedUpTo(int max) {
         for (int c=0; c<=max; c++) { 
             assertThat(commandPoller.isCleared(c), equalTo(true));
@@ -147,7 +157,7 @@ public class CommandPollerTest {
                 Mockito.eq(Reset.earliest), 
                 handlerCaptor.capture()))
             .thenReturn(poller);
-        commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME, 1000);
+        commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME, 1000, callback);
         commandHandler = handlerCaptor.getValue().getHandler();
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DelayTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DelayTest.java
new file mode 100644
index 0000000..5467477
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DelayTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.LongSupplier;
+import java.util.stream.LongStream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DelayTest {
+
+    private static final long START_DELAY = 1L;
+
+    private static final long MAX_DELAY = 1000L;
+
+    private ScheduledExecutorService scheduler;
+
+    @Before
+    public void before() {
+        scheduler = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @After
+    public void after() {
+        scheduler.shutdownNow();
+    }
+
+    @Test
+    public void testExponentialStartDelay() {
+        LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+        assertEquals(START_DELAY, delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialIncreasingDelay() {
+        LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+        assertTrue(delay.getAsLong() < delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialIncreasingRateDelay() {
+        LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+        assertEquals(1, delay.getAsLong());
+        assertEquals(2, delay.getAsLong());
+        assertEquals(4, delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialMaxDelay() {
+        LongSupplier delay = Delay.exponential(START_DELAY, MAX_DELAY);
+        long maxAfterHundredDelays = LongStream.generate(delay).limit(100).max().orElseThrow(IllegalStateException::new);
+        assertEquals(MAX_DELAY, maxAfterHundredDelays);
+    }
+
+    @Test(timeout = 15000)
+    public void testResumeDelay() throws Exception {
+        Delay delayer = new Delay();
+        CompletableFuture<Void> delayOp = runAsync(() -> delayer.await(HOURS.toMillis(1)));
+        scheduler.schedule(delayer::signal, 10, MILLISECONDS);
+        delayOp.get();
+        assertTrue(delayOp.isDone());
+    }
+
+    @Test(timeout = 15000)
+    public void testDelay() {
+        Delay delay = new Delay();
+        long duration = 100;
+        long start = System.nanoTime();
+        delay.await(duration);
+        long stop = System.nanoTime();
+        assertTrue((stop - start) >= duration);
+    }
+
+}
\ No newline at end of file