You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/20 15:37:03 UTC

[sling-org-apache-sling-distribution-journal] 01/01: SLING-8944 - Systemready check for idle subscriber

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-8944
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit f6e5514c9ddc1fb51eac3c53881ff31b1df87ea0
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Dec 20 16:35:29 2019 +0100

    SLING-8944 - Systemready check for idle subscriber
---
 pom.xml                                            |  5 ++
 .../impl/subscriber/DistributionSubscriber.java    |  6 ++
 .../journal/impl/subscriber/SubscriberIdle.java    | 87 ++++++++++++++++++++++
 .../impl/subscriber/SubscriberIdleTest.java        | 54 ++++++++++++++
 .../journal/impl/subscriber/SubscriberTest.java    |  5 +-
 5 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 7669d24..b7af823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.systemready</artifactId>
+            <version>0.4.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.converter</artifactId>
             <version>1.0.0</version>
             <scope>test</scope>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..0926cbc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -163,6 +163,9 @@ public class DistributionSubscriber implements DistributionAgent {
     @Reference
     private Packaging packaging;
     
+    @Reference
+    private SubscriberIdle subscriberIdle;
+    
     private ServiceRegistration<DistributionAgent> componentReg;
 
     private final PackageRetries packageRetries = new PackageRetries();
@@ -405,6 +408,7 @@ public class DistributionSubscriber implements DistributionAgent {
     }
 
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
+        subscriberIdle.resetIdleTimer();
         if (! queueNames.contains(message.getPubAgentName())) {
             LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
             return;
@@ -432,6 +436,8 @@ public class DistributionSubscriber implements DistributionAgent {
             if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
                 distributionMetricsService.getItemsBufferSize().increment();
                 return;
+            } else {
+                subscriberIdle.resetIdleTimer();
             }
         }
         throw new InterruptedException();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
new file mode 100644
index 0000000..19cbfd8
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.felix.systemready.CheckStatus;
+import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.felix.systemready.StateType;
+import org.apache.felix.systemready.SystemReadyCheck;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * A DistributionSubscriber is considered ready only when it is idle for more than 
+ * the READY_IDLE_TIME_SECONDS at least once.
+ */
+@Component
+public class SubscriberIdle implements SystemReadyCheck, Closeable {
+    private static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+
+    private final int idleMillis;
+    private final AtomicBoolean isReady = new AtomicBoolean();
+    private final ScheduledExecutorService executor;
+    private ScheduledFuture<?> schedule;
+    
+    public SubscriberIdle() {
+        this(DEFAULT_IDLE_TIME_MILLIS);
+    }
+
+    public SubscriberIdle(int idleMillis) {
+        this.idleMillis = idleMillis;
+        executor = Executors.newScheduledThreadPool(1);
+    }
+    
+    @Override
+    public String getName() {
+        return "DistributionSubscriber idle";
+    }
+
+    @Override
+    public CheckStatus getStatus() {
+        State state = isReady.get() ? State.GREEN : State.RED; 
+        return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle");
+    }
+
+    /**
+     * Called by DistributionSubscriber whenever a new message comes in or while the 
+     * internal queue is full
+     */
+    public synchronized void resetIdleTimer() {
+        if (schedule != null) {
+            schedule.cancel(false);
+        }
+        schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
+    }
+    
+    private void ready() {
+        isReady.set(true);
+    }
+
+    @Override
+    public void close() {
+        executor.shutdownNow();
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
new file mode 100644
index 0000000..7af022e
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.Test;
+
+public class SubscriberIdleTest {
+
+    private SubscriberIdle idle;
+
+    @Test
+    public void testIdle() throws InterruptedException {
+        idle = new SubscriberIdle(20);
+        assertState("Initial state", State.RED);
+        idle.resetIdleTimer();
+        assertState("State after reset", State.RED);
+        Thread.sleep(15);
+        assertState("State after time below idle limit", State.RED);
+        idle.resetIdleTimer();
+        Thread.sleep(15);
+        assertState("State after time over limit but reset in between", State.RED);
+        Thread.sleep(15);
+        assertState("State after time over idle limit", State.GREEN);
+        idle.resetIdleTimer();
+        assertState("State should not be reset once it reached GREEN", State.GREEN);
+        idle.close();
+    }
+
+    private void assertState(String message, State expectedState) {
+        assertThat(message, idle.getStatus().getState(), equalTo(expectedState));
+    }
+    
+    
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index cf06393..6337c60 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -188,6 +188,9 @@ public class SubscriberTest {
 
     @Mock
     private Timer.Context timerContext;
+    
+    @Mock
+    private SubscriberIdle subscriberIdle;
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -200,7 +203,7 @@ public class SubscriberTest {
     
     @Mock
     private ServiceRegistration<DistributionAgent> reg;
-
+    
     private MessageHandler<PackageMessage> packageHandler;