You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/04/28 12:40:49 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9402 - subscriberIdle state should survive restarts of subscriber

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d4880d  SLING-9402 - subscriberIdle state should survive restarts of subscriber
5d4880d is described below

commit 5d4880def246620ba8555d59541e1f7953a54df1
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Apr 28 14:40:37 2020 +0200

    SLING-9402 - subscriberIdle state should survive restarts of subscriber
---
 .../impl/subscriber/DistributionSubscriber.java    |  8 +++--
 .../journal/impl/subscriber/SubscriberIdle.java    |  5 ++--
 .../impl/subscriber/SubscriberReadyStore.java      | 35 ++++++++++++++++++++++
 .../impl/subscriber/SubscriberIdleTest.java        |  9 ++++--
 .../journal/impl/subscriber/SubscriberTest.java    |  5 +++-
 5 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 4aae01d..04782f9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -42,6 +42,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 import javax.annotation.Nonnull;
@@ -142,8 +143,10 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Reference
     private Packaging packaging;
+
+    private SubscriberReadyStore subscriberReadyStore;
     
-    Optional<SubscriberIdle> subscriberIdle;
+    private Optional<SubscriberIdle> subscriberIdle;
     
     private ServiceRegistration<DistributionAgent> componentReg;
 
@@ -184,7 +187,8 @@ public class DistributionSubscriber implements DistributionAgent {
         if (config.subscriberIdleCheck()) {
             // Unofficial config (currently just for test)
             Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
-            subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies));
+            AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
+            subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies, readyHolder));
         } else {
             subscriberIdle = Optional.empty();
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index a05b80c..93b73c4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -41,14 +41,15 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
     public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
 
     private final int idleMillis;
-    private final AtomicBoolean isReady = new AtomicBoolean();
+    private final AtomicBoolean isReady;
     private final ScheduledExecutorService executor;
     private ScheduledFuture<?> schedule;
 
     private ServiceRegistration<SystemReadyCheck> reg;
     
-    public SubscriberIdle(BundleContext context, int idleMillis) {
+    public SubscriberIdle(BundleContext context, int idleMillis, AtomicBoolean readyHolder) {
         this.idleMillis = idleMillis;
+        this.isReady = readyHolder;
         executor = Executors.newScheduledThreadPool(1);
         idle();
         this.reg = context.registerService(SystemReadyCheck.class, this, new Hashtable<>());
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyStore.java
new file mode 100644
index 0000000..610d9f5
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.osgi.service.component.annotations.Component;
+
+@Component(service = SubscriberReadyStore.class)
+public class SubscriberReadyStore {
+
+    Map<String, AtomicBoolean> subscriberReady = new ConcurrentHashMap<>();
+    
+    AtomicBoolean getReadyHolder(String subscriberName) {
+        return subscriberReady.computeIfAbsent(subscriberName, name -> new AtomicBoolean(false));
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index ef5f0d7..ac97fee 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -21,6 +21,8 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.felix.systemready.CheckStatus.State;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -30,11 +32,13 @@ public class SubscriberIdleTest {
 
     private static final int IDLE_MILLIES = 40;
     private SubscriberIdle idle;
+    private AtomicBoolean readyHolder;
     
     @Test
     public void testIdle() throws InterruptedException {
         BundleContext context = Mockito.mock(BundleContext.class);
-        idle = new SubscriberIdle(context , IDLE_MILLIES);
+        readyHolder = new AtomicBoolean();
+        idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
         assertState("Initial state", State.RED);
         idle.busy();
         idle.idle();
@@ -55,7 +59,8 @@ public class SubscriberIdleTest {
     @Test
     public void testStartIdle() throws InterruptedException {
         BundleContext context = Mockito.mock(BundleContext.class);
-        idle = new SubscriberIdle(context , IDLE_MILLIES);
+        readyHolder = new AtomicBoolean();
+        idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
         assertState("Initial state", State.RED);
         Thread.sleep(IDLE_MILLIES * 2);
         assertState("State after time over idle limit", State.GREEN);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 6b093eb..467509f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -174,6 +174,9 @@ public class SubscriberTest {
 
     @Mock
     private DistributionMetricsService distributionMetricsService;
+    
+    @Spy
+    SubscriberReadyStore subscriberReadyStore = new SubscriberReadyStore();
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -351,7 +354,7 @@ public class SubscriberTest {
 
         packageHandler.handle(info, message);
         waitSubscriber(RUNNING);
-        await("Should report ready").until(() -> subscriber.subscriberIdle.get().isReady());
+        await("Should report ready").until(() -> subscriberReadyStore.getReadyHolder(SUB1_AGENT_NAME).get());
         sem.release();
     }