You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/09/02 14:22:20 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9717 - Avoid DistributionSubscriber optional fields

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 56e0068  SLING-9717 - Avoid DistributionSubscriber optional fields
56e0068 is described below

commit 56e00680c75d5152ea1ff702c93442605a10e7c3
Author: tmaret <tm...@adobe.com>
AuthorDate: Wed Sep 2 16:22:00 2020 +0200

    SLING-9717 - Avoid DistributionSubscriber optional fields
---
 .../impl/subscriber/DistributionSubscriber.java    | 37 ++++++++++----------
 .../journal/impl/subscriber/IdleCheck.java         | 36 ++++++++++++++++++++
 .../journal/impl/subscriber/NoopIdle.java          | 39 ++++++++++++++++++++++
 .../journal/impl/subscriber/SubscriberIdle.java    |  8 ++---
 4 files changed, 96 insertions(+), 24 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9c6f002..c5f704d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -28,7 +28,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -115,11 +114,11 @@ public class DistributionSubscriber {
     @Reference
     private SubscriberReadyStore subscriberReadyStore;
     
-    private Optional<SubscriberIdle> subscriberIdle;
+    private IdleCheck idleCheck;
     
     private Closeable packagePoller;
 
-    private Optional<CommandPoller> commandPoller;
+    private volatile CommandPoller commandPoller;
 
     private BookKeeper bookKeeper;
 
@@ -155,9 +154,9 @@ public class DistributionSubscriber {
             // Unofficial config (currently just for test)
             Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
             AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
-            subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies, readyHolder));
+            idleCheck = new SubscriberIdle(context, idleMillies, readyHolder);
         } else {
-            subscriberIdle = Optional.empty();
+            idleCheck = new NoopIdle();
         }
         
         queueNames = getNotEmpty(config.agentNames());
@@ -176,9 +175,7 @@ public class DistributionSubscriber {
                 HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage));
 
         if (config.editable()) {
-            commandPoller = Optional.of(new CommandPoller(messagingProvider, topics, subSlingId, subAgentName));
-        } else {
-            commandPoller = Optional.empty();
+            commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName);
         }
 
         queueThread = startBackgroundThread(this::processQueue,
@@ -206,9 +203,7 @@ public class DistributionSubscriber {
          */
 
         IOUtils.closeQuietly(announcer, bookKeeper, 
-                packagePoller);
-        commandPoller.ifPresent(IOUtils::closeQuietly);
-        subscriberIdle.ifPresent(IOUtils::closeQuietly);
+                packagePoller, idleCheck, commandPoller);
         running = false;
         try {
             queueThread.join();
@@ -337,26 +332,30 @@ public class DistributionSubscriber {
         PackageMessage pkgMsg = item.getMessage();
         boolean skip = shouldSkip(info.getOffset());
         try {
-            subscriberIdle.ifPresent((idle) -> {
-                int retries = bookKeeper.getRetries(pkgMsg.getPubAgentName());
-                idle.busy(retries);
-            });
+            idleCheck.busy(bookKeeper.getRetries(pkgMsg.getPubAgentName()));
             if (skip) {
                 bookKeeper.removePackage(pkgMsg, info.getOffset());
             } else {
                 bookKeeper.importPackage(pkgMsg, info.getOffset(), info.getCreateTime());
             }
         } finally {
-            subscriberIdle.ifPresent(SubscriberIdle::idle);
+            idleCheck.idle();
         }
     }
 
     private boolean shouldSkip(long offset) {
-        boolean cleared = commandPoller.isPresent() && commandPoller.get().isCleared(offset);
-        Decision decision = waitPrecondition(offset);
-        return cleared || decision == Decision.SKIP;
+        return isCleared(offset) || isSkipped(offset);
+    }
+
+    private boolean isCleared(long offset) {
+        return (commandPoller != null) && commandPoller.isCleared(offset);
     }
 
+    private boolean isSkipped(long offset) {
+        return waitPrecondition(offset) == Decision.SKIP;
+    }
+
+
     private Decision waitPrecondition(long offset) {
         Decision decision = Precondition.Decision.WAIT;
         long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
new file mode 100644
index 0000000..ace8a03
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+
+public interface IdleCheck extends Closeable {
+
+    /**
+     * Called when processing of a message starts
+     *
+     * @param retries the number of retries to process the message
+     */
+    void busy(int retries);
+
+    /**
+     * Called when processing of a message has finished
+     */
+    void idle();
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
new file mode 100644
index 0000000..07fe70e
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.IOException;
+
+public class NoopIdle implements IdleCheck {
+
+    @Override
+    public void busy(int retries) {
+
+    }
+
+    @Override
+    public void idle() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index 4f8949b..eb65339 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -38,7 +38,7 @@ import org.osgi.framework.ServiceRegistration;
  * the READY_IDLE_TIME_SECONDS at least once ; or when it is busy processing
  * the same package for more than MAX_RETRIES times.
  */
-public class SubscriberIdle implements SystemReadyCheck, Closeable {
+public class SubscriberIdle implements IdleCheck, SystemReadyCheck {
     public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
 
     public static final int MAX_RETRIES = 10;
@@ -70,9 +70,7 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
     }
     
     /**
-     * Called when processing of a message starts
-     *
-     * @param retries the number of retries to process the message
+     * {@inheritDoc}
      */
     public synchronized void busy(int retries) {
         cancelSchedule();
@@ -82,7 +80,7 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
     }
 
     /**
-     * Called when processing of a message has finished
+     * {@inheritDoc}
      */
     public synchronized void idle() {
         if (!isReady.get()) {