You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2021/07/09 22:19:27 UTC

[sling-org-apache-sling-distribution-journal] branch SLING-10592 created (now c47fafd)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a change to branch SLING-10592
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


      at c47fafd  SLING-10592 - Follow an exponential delay between import attempts

This branch includes the following new commits:

     new c47fafd  SLING-10592 - Follow an exponential delay between import attempts

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[sling-org-apache-sling-distribution-journal] 01/01: SLING-10592 - Follow an exponential delay between import attempts

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch SLING-10592
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit c47fafd8e190b04d36714e73cb58e4b7b6b466e9
Author: tmaret <tm...@adobe.com>
AuthorDate: Sat Jul 10 00:18:26 2021 +0200

    SLING-10592 - Follow an exponential delay between import attempts
---
 .../impl/subscriber/DistributionSubscriber.java    | 10 +++-
 .../sling/distribution/journal/shared/Delays.java  | 32 +++++++++++
 .../distribution/journal/shared/DelaysTest.java    | 62 ++++++++++++++++++++++
 3 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index d37d1c2..e227286 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -22,6 +22,7 @@ import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.shared.Delays.exponential;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -35,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
@@ -85,8 +88,10 @@ import org.slf4j.LoggerFactory;
 public class DistributionSubscriber {
     private static final int PRECONDITION_TIMEOUT = 60;
     static int RETRY_DELAY = 5000;
+    static int MAX_RETRY_DELAY = 300000; // 5 minutes
     static int QUEUE_FETCH_DELAY = 1000;
     private static final long COMMAND_NOT_IDLE_DELAY_MS = 200;
+    private static final Supplier<LongSupplier> catchAllDelays = () -> exponential(RETRY_DELAY, MAX_RETRY_DELAY);
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
 
@@ -142,6 +147,8 @@ public class DistributionSubscriber {
     private volatile boolean running = true;
     private Thread queueThread;
 
+    private LongSupplier catchAllDelay = catchAllDelays.get();
+
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
         String subSlingId = requireNonNull(slingSettings.getSlingId());
@@ -305,7 +312,7 @@ public class DistributionSubscriber {
             } catch (Exception e) {
                 // Catch all to prevent processing from stopping
                 LOG.error("Error processing queue item", e);
-                delay(RETRY_DELAY);
+                delay(catchAllDelay.getAsLong());
             }
         }
         LOG.info("Stopped Queue processor");
@@ -318,6 +325,7 @@ public class DistributionSubscriber {
             processQueueItem(item);
             messageBuffer.remove();
             distributionMetricsService.getItemsBufferSize().decrement();
+            catchAllDelay = catchAllDelays.get();
         }
     }
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java b/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java
new file mode 100644
index 0000000..34f7cbb
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.function.LongSupplier;
+
+import static java.lang.Math.min;
+import static java.util.stream.LongStream.iterate;
+
+public final class Delays {
+
+    public static LongSupplier exponential(long startDelay, long maxDelay) {
+        return iterate(startDelay, delay -> min(2 * delay, maxDelay)).iterator()::next;
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java
new file mode 100644
index 0000000..335960f
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.function.LongSupplier;
+import java.util.stream.LongStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DelaysTest {
+
+    private static final long START_DELAY = 1L;
+
+    private static final long MAX_DELAY = 1000L;
+
+    @Test
+    public void testExponentialStartDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        assertEquals(START_DELAY, delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialIncreasingDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        assertTrue(delay.getAsLong() < delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialIncreasingRateDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        assertEquals(1, delay.getAsLong());
+        assertEquals(2, delay.getAsLong());
+        assertEquals(4, delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialMaxDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        long maxAfterHundredDelays = LongStream.generate(delay).limit(100).max().orElseThrow(IllegalStateException::new);
+        assertEquals(MAX_DELAY, maxAfterHundredDelays);
+    }
+
+}
\ No newline at end of file