You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/20 15:37:02 UTC

[sling-org-apache-sling-distribution-journal] branch SLING-8944 created (now f6e5514)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch SLING-8944
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


      at f6e5514  SLING-8944 - Systemready check for idle subscriber

This branch includes the following new commits:

     new f6e5514  SLING-8944 - Systemready check for idle subscriber

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[sling-org-apache-sling-distribution-journal] 01/01: SLING-8944 - Systemready check for idle subscriber

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-8944
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit f6e5514c9ddc1fb51eac3c53881ff31b1df87ea0
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Dec 20 16:35:29 2019 +0100

    SLING-8944 - Systemready check for idle subscriber
---
 pom.xml                                            |  5 ++
 .../impl/subscriber/DistributionSubscriber.java    |  6 ++
 .../journal/impl/subscriber/SubscriberIdle.java    | 87 ++++++++++++++++++++++
 .../impl/subscriber/SubscriberIdleTest.java        | 54 ++++++++++++++
 .../journal/impl/subscriber/SubscriberTest.java    |  5 +-
 5 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 7669d24..b7af823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.systemready</artifactId>
+            <version>0.4.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.converter</artifactId>
             <version>1.0.0</version>
             <scope>test</scope>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..0926cbc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -163,6 +163,9 @@ public class DistributionSubscriber implements DistributionAgent {
     @Reference
     private Packaging packaging;
     
+    @Reference
+    private SubscriberIdle subscriberIdle;
+    
     private ServiceRegistration<DistributionAgent> componentReg;
 
     private final PackageRetries packageRetries = new PackageRetries();
@@ -405,6 +408,7 @@ public class DistributionSubscriber implements DistributionAgent {
     }
 
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
+        subscriberIdle.resetIdleTimer();
         if (! queueNames.contains(message.getPubAgentName())) {
             LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
             return;
@@ -432,6 +436,8 @@ public class DistributionSubscriber implements DistributionAgent {
             if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
                 distributionMetricsService.getItemsBufferSize().increment();
                 return;
+            } else {
+                subscriberIdle.resetIdleTimer();
             }
         }
         throw new InterruptedException();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
new file mode 100644
index 0000000..19cbfd8
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.felix.systemready.CheckStatus;
+import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.felix.systemready.StateType;
+import org.apache.felix.systemready.SystemReadyCheck;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * A DistributionSubscriber is considered ready only when it is idle for more than 
+ * the READY_IDLE_TIME_SECONDS at least once.
+ */
+@Component
+public class SubscriberIdle implements SystemReadyCheck, Closeable {
+    private static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+
+    private final int idleMillis;
+    private final AtomicBoolean isReady = new AtomicBoolean();
+    private final ScheduledExecutorService executor;
+    private ScheduledFuture<?> schedule;
+    
+    public SubscriberIdle() {
+        this(DEFAULT_IDLE_TIME_MILLIS);
+    }
+
+    public SubscriberIdle(int idleMillis) {
+        this.idleMillis = idleMillis;
+        executor = Executors.newScheduledThreadPool(1);
+    }
+    
+    @Override
+    public String getName() {
+        return "DistributionSubscriber idle";
+    }
+
+    @Override
+    public CheckStatus getStatus() {
+        State state = isReady.get() ? State.GREEN : State.RED; 
+        return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle");
+    }
+
+    /**
+     * Called by DistributionSubscriber whenever a new message comes in or while the 
+     * internal queue is full
+     */
+    public synchronized void resetIdleTimer() {
+        if (schedule != null) {
+            schedule.cancel(false);
+        }
+        schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
+    }
+    
+    private void ready() {
+        isReady.set(true);
+    }
+
+    @Override
+    public void close() {
+        executor.shutdownNow();
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
new file mode 100644
index 0000000..7af022e
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.Test;
+
+public class SubscriberIdleTest {
+
+    private SubscriberIdle idle;
+
+    @Test
+    public void testIdle() throws InterruptedException {
+        idle = new SubscriberIdle(20);
+        assertState("Initial state", State.RED);
+        idle.resetIdleTimer();
+        assertState("State after reset", State.RED);
+        Thread.sleep(15);
+        assertState("State after time below idle limit", State.RED);
+        idle.resetIdleTimer();
+        Thread.sleep(15);
+        assertState("State after time over limit but reset in between", State.RED);
+        Thread.sleep(15);
+        assertState("State after time over idle limit", State.GREEN);
+        idle.resetIdleTimer();
+        assertState("State should not be reset once it reached GREEN", State.GREEN);
+        idle.close();
+    }
+
+    private void assertState(String message, State expectedState) {
+        assertThat(message, idle.getStatus().getState(), equalTo(expectedState));
+    }
+    
+    
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index cf06393..6337c60 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -188,6 +188,9 @@ public class SubscriberTest {
 
     @Mock
     private Timer.Context timerContext;
+    
+    @Mock
+    private SubscriberIdle subscriberIdle;
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -200,7 +203,7 @@ public class SubscriberTest {
     
     @Mock
     private ServiceRegistration<DistributionAgent> reg;
-
+    
     private MessageHandler<PackageMessage> packageHandler;