You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/20 15:37:03 UTC
[sling-org-apache-sling-distribution-journal] 01/01: SLING-8944 -
Systemready check for idle subscriber
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-8944
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit f6e5514c9ddc1fb51eac3c53881ff31b1df87ea0
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Dec 20 16:35:29 2019 +0100
SLING-8944 - Systemready check for idle subscriber
---
pom.xml | 5 ++
.../impl/subscriber/DistributionSubscriber.java | 6 ++
.../journal/impl/subscriber/SubscriberIdle.java | 87 ++++++++++++++++++++++
.../impl/subscriber/SubscriberIdleTest.java | 54 ++++++++++++++
.../journal/impl/subscriber/SubscriberTest.java | 5 +-
5 files changed, 156 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 7669d24..b7af823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,11 @@
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.systemready</artifactId>
+ <version>0.4.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.converter</artifactId>
<version>1.0.0</version>
<scope>test</scope>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..0926cbc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -163,6 +163,9 @@ public class DistributionSubscriber implements DistributionAgent {
@Reference
private Packaging packaging;
+ @Reference
+ private SubscriberIdle subscriberIdle;
+
private ServiceRegistration<DistributionAgent> componentReg;
private final PackageRetries packageRetries = new PackageRetries();
@@ -405,6 +408,7 @@ public class DistributionSubscriber implements DistributionAgent {
}
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
+ subscriberIdle.resetIdleTimer();
if (! queueNames.contains(message.getPubAgentName())) {
LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
return;
@@ -432,6 +436,8 @@ public class DistributionSubscriber implements DistributionAgent {
if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
distributionMetricsService.getItemsBufferSize().increment();
return;
+ } else {
+ subscriberIdle.resetIdleTimer();
}
}
throw new InterruptedException();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
new file mode 100644
index 0000000..19cbfd8
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.felix.systemready.CheckStatus;
+import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.felix.systemready.StateType;
+import org.apache.felix.systemready.SystemReadyCheck;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * A DistributionSubscriber is considered ready only when it is idle for more than
+ * the READY_IDLE_TIME_SECONDS at least once.
+ */
+@Component
+public class SubscriberIdle implements SystemReadyCheck, Closeable {
+ private static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+
+ private final int idleMillis;
+ private final AtomicBoolean isReady = new AtomicBoolean();
+ private final ScheduledExecutorService executor;
+ private ScheduledFuture<?> schedule;
+
+ public SubscriberIdle() {
+ this(DEFAULT_IDLE_TIME_MILLIS);
+ }
+
+ public SubscriberIdle(int idleMillis) {
+ this.idleMillis = idleMillis;
+ executor = Executors.newScheduledThreadPool(1);
+ }
+
+ @Override
+ public String getName() {
+ return "DistributionSubscriber idle";
+ }
+
+ @Override
+ public CheckStatus getStatus() {
+ State state = isReady.get() ? State.GREEN : State.RED;
+ return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle");
+ }
+
+ /**
+ * Called by DistributionSubscriber whenever a new message comes in or while the
+ * internal queue is full
+ */
+ public synchronized void resetIdleTimer() {
+ if (schedule != null) {
+ schedule.cancel(false);
+ }
+ schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
+ }
+
+ private void ready() {
+ isReady.set(true);
+ }
+
+ @Override
+ public void close() {
+ executor.shutdownNow();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
new file mode 100644
index 0000000..7af022e
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.Test;
+
+public class SubscriberIdleTest {
+
+ private SubscriberIdle idle;
+
+ @Test
+ public void testIdle() throws InterruptedException {
+ idle = new SubscriberIdle(20);
+ assertState("Initial state", State.RED);
+ idle.resetIdleTimer();
+ assertState("State after reset", State.RED);
+ Thread.sleep(15);
+ assertState("State after time below idle limit", State.RED);
+ idle.resetIdleTimer();
+ Thread.sleep(15);
+ assertState("State after time over limit but reset in between", State.RED);
+ Thread.sleep(15);
+ assertState("State after time over idle limit", State.GREEN);
+ idle.resetIdleTimer();
+ assertState("State should not be reset once it reached GREEN", State.GREEN);
+ idle.close();
+ }
+
+ private void assertState(String message, State expectedState) {
+ assertThat(message, idle.getStatus().getState(), equalTo(expectedState));
+ }
+
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index cf06393..6337c60 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -188,6 +188,9 @@ public class SubscriberTest {
@Mock
private Timer.Context timerContext;
+
+ @Mock
+ private SubscriberIdle subscriberIdle;
@InjectMocks
DistributionSubscriber subscriber;
@@ -200,7 +203,7 @@ public class SubscriberTest {
@Mock
private ServiceRegistration<DistributionAgent> reg;
-
+
private MessageHandler<PackageMessage> packageHandler;