You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/04/28 12:40:49 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9402 - subscriberIdle state should survive restarts of
subscriber
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 5d4880d SLING-9402 - subscriberIdle state should survive restarts of subscriber
5d4880d is described below
commit 5d4880def246620ba8555d59541e1f7953a54df1
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Apr 28 14:40:37 2020 +0200
SLING-9402 - subscriberIdle state should survive restarts of subscriber
---
.../impl/subscriber/DistributionSubscriber.java | 8 +++--
.../journal/impl/subscriber/SubscriberIdle.java | 5 ++--
.../impl/subscriber/SubscriberReadyStore.java | 35 ++++++++++++++++++++++
.../impl/subscriber/SubscriberIdleTest.java | 9 ++++--
.../journal/impl/subscriber/SubscriberTest.java | 5 +++-
5 files changed, 55 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 4aae01d..04782f9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -42,6 +42,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
@@ -142,8 +143,10 @@ public class DistributionSubscriber implements DistributionAgent {
@Reference
private Packaging packaging;
+
+ private SubscriberReadyStore subscriberReadyStore;
- Optional<SubscriberIdle> subscriberIdle;
+ private Optional<SubscriberIdle> subscriberIdle;
private ServiceRegistration<DistributionAgent> componentReg;
@@ -184,7 +187,8 @@ public class DistributionSubscriber implements DistributionAgent {
if (config.subscriberIdleCheck()) {
// Unofficial config (currently just for test)
Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
- subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies));
+ AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
+ subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies, readyHolder));
} else {
subscriberIdle = Optional.empty();
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index a05b80c..93b73c4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -41,14 +41,15 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
private final int idleMillis;
- private final AtomicBoolean isReady = new AtomicBoolean();
+ private final AtomicBoolean isReady;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> schedule;
private ServiceRegistration<SystemReadyCheck> reg;
- public SubscriberIdle(BundleContext context, int idleMillis) {
+ public SubscriberIdle(BundleContext context, int idleMillis, AtomicBoolean readyHolder) {
this.idleMillis = idleMillis;
+ this.isReady = readyHolder;
executor = Executors.newScheduledThreadPool(1);
idle();
this.reg = context.registerService(SystemReadyCheck.class, this, new Hashtable<>());
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyStore.java
new file mode 100644
index 0000000..610d9f5
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.osgi.service.component.annotations.Component;
+
+@Component(service = SubscriberReadyStore.class)
+public class SubscriberReadyStore {
+
+ Map<String, AtomicBoolean> subscriberReady = new ConcurrentHashMap<>();
+
+ AtomicBoolean getReadyHolder(String subscriberName) {
+ return subscriberReady.computeIfAbsent(subscriberName, name -> new AtomicBoolean(false));
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index ef5f0d7..ac97fee 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -21,6 +21,8 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.felix.systemready.CheckStatus.State;
import org.junit.Test;
import org.mockito.Mockito;
@@ -30,11 +32,13 @@ public class SubscriberIdleTest {
private static final int IDLE_MILLIES = 40;
private SubscriberIdle idle;
+ private AtomicBoolean readyHolder;
@Test
public void testIdle() throws InterruptedException {
BundleContext context = Mockito.mock(BundleContext.class);
- idle = new SubscriberIdle(context , IDLE_MILLIES);
+ readyHolder = new AtomicBoolean();
+ idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
assertState("Initial state", State.RED);
idle.busy();
idle.idle();
@@ -55,7 +59,8 @@ public class SubscriberIdleTest {
@Test
public void testStartIdle() throws InterruptedException {
BundleContext context = Mockito.mock(BundleContext.class);
- idle = new SubscriberIdle(context , IDLE_MILLIES);
+ readyHolder = new AtomicBoolean();
+ idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
assertState("Initial state", State.RED);
Thread.sleep(IDLE_MILLIES * 2);
assertState("State after time over idle limit", State.GREEN);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 6b093eb..467509f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -174,6 +174,9 @@ public class SubscriberTest {
@Mock
private DistributionMetricsService distributionMetricsService;
+
+ @Spy
+ SubscriberReadyStore subscriberReadyStore = new SubscriberReadyStore();
@InjectMocks
DistributionSubscriber subscriber;
@@ -351,7 +354,7 @@ public class SubscriberTest {
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
- await("Should report ready").until(() -> subscriber.subscriberIdle.get().isReady());
+ await("Should report ready").until(() -> subscriberReadyStore.getReadyHolder(SUB1_AGENT_NAME).get());
sem.release();
}