You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/09/02 14:22:20 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9717 - Avoid DistributionSubscriber optional fields
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 56e0068 SLING-9717 - Avoid DistributionSubscriber optional fields
56e0068 is described below
commit 56e00680c75d5152ea1ff702c93442605a10e7c3
Author: tmaret <tm...@adobe.com>
AuthorDate: Wed Sep 2 16:22:00 2020 +0200
SLING-9717 - Avoid DistributionSubscriber optional fields
---
.../impl/subscriber/DistributionSubscriber.java | 37 ++++++++++----------
.../journal/impl/subscriber/IdleCheck.java | 36 ++++++++++++++++++++
.../journal/impl/subscriber/NoopIdle.java | 39 ++++++++++++++++++++++
.../journal/impl/subscriber/SubscriberIdle.java | 8 ++---
4 files changed, 96 insertions(+), 24 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9c6f002..c5f704d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -28,7 +28,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -115,11 +114,11 @@ public class DistributionSubscriber {
@Reference
private SubscriberReadyStore subscriberReadyStore;
- private Optional<SubscriberIdle> subscriberIdle;
+ private IdleCheck idleCheck;
private Closeable packagePoller;
- private Optional<CommandPoller> commandPoller;
+ private volatile CommandPoller commandPoller;
private BookKeeper bookKeeper;
@@ -155,9 +154,9 @@ public class DistributionSubscriber {
// Unofficial config (currently just for test)
Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
- subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies, readyHolder));
+ idleCheck = new SubscriberIdle(context, idleMillies, readyHolder);
} else {
- subscriberIdle = Optional.empty();
+ idleCheck = new NoopIdle();
}
queueNames = getNotEmpty(config.agentNames());
@@ -176,9 +175,7 @@ public class DistributionSubscriber {
HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage));
if (config.editable()) {
- commandPoller = Optional.of(new CommandPoller(messagingProvider, topics, subSlingId, subAgentName));
- } else {
- commandPoller = Optional.empty();
+ commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName);
}
queueThread = startBackgroundThread(this::processQueue,
@@ -206,9 +203,7 @@ public class DistributionSubscriber {
*/
IOUtils.closeQuietly(announcer, bookKeeper,
- packagePoller);
- commandPoller.ifPresent(IOUtils::closeQuietly);
- subscriberIdle.ifPresent(IOUtils::closeQuietly);
+ packagePoller, idleCheck, commandPoller);
running = false;
try {
queueThread.join();
@@ -337,26 +332,30 @@ public class DistributionSubscriber {
PackageMessage pkgMsg = item.getMessage();
boolean skip = shouldSkip(info.getOffset());
try {
- subscriberIdle.ifPresent((idle) -> {
- int retries = bookKeeper.getRetries(pkgMsg.getPubAgentName());
- idle.busy(retries);
- });
+ idleCheck.busy(bookKeeper.getRetries(pkgMsg.getPubAgentName()));
if (skip) {
bookKeeper.removePackage(pkgMsg, info.getOffset());
} else {
bookKeeper.importPackage(pkgMsg, info.getOffset(), info.getCreateTime());
}
} finally {
- subscriberIdle.ifPresent(SubscriberIdle::idle);
+ idleCheck.idle();
}
}
private boolean shouldSkip(long offset) {
- boolean cleared = commandPoller.isPresent() && commandPoller.get().isCleared(offset);
- Decision decision = waitPrecondition(offset);
- return cleared || decision == Decision.SKIP;
+ return isCleared(offset) || isSkipped(offset);
+ }
+
+ private boolean isCleared(long offset) {
+ return (commandPoller != null) && commandPoller.isCleared(offset);
}
+ private boolean isSkipped(long offset) {
+ return waitPrecondition(offset) == Decision.SKIP;
+ }
+
+
private Decision waitPrecondition(long offset) {
Decision decision = Precondition.Decision.WAIT;
long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
new file mode 100644
index 0000000..ace8a03
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+
+public interface IdleCheck extends Closeable {
+
+ /**
+ * Called when processing of a message starts
+ *
+ * @param retries the number of retries to process the message
+ */
+ void busy(int retries);
+
+ /**
+ * Called when processing of a message has finished
+ */
+ void idle();
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
new file mode 100644
index 0000000..07fe70e
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.IOException;
+
+public class NoopIdle implements IdleCheck {
+
+ @Override
+ public void busy(int retries) {
+
+ }
+
+ @Override
+ public void idle() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index 4f8949b..eb65339 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -38,7 +38,7 @@ import org.osgi.framework.ServiceRegistration;
* the READY_IDLE_TIME_SECONDS at least once ; or when it is busy processing
* the same package for more than MAX_RETRIES times.
*/
-public class SubscriberIdle implements SystemReadyCheck, Closeable {
+public class SubscriberIdle implements IdleCheck, SystemReadyCheck {
public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
public static final int MAX_RETRIES = 10;
@@ -70,9 +70,7 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
}
/**
- * Called when processing of a message starts
- *
- * @param retries the number of retries to process the message
+ * {@inheritDoc}
*/
public synchronized void busy(int retries) {
cancelSchedule();
@@ -82,7 +80,7 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
}
/**
- * Called when processing of a message has finished
+ * {@inheritDoc}
*/
public synchronized void idle() {
if (!isReady.get()) {