You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/02/16 10:24:54 UTC
[sling-org-apache-sling-distribution-journal] 01/01: SLING-9075 -
Support multiple agents with one StagingPrecondition
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-9075
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit fb447a53a7c2d678520fb5a3c6ef654eccd10e6c
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Sun Feb 16 11:24:31 2020 +0100
SLING-9075 - Support multiple agents with one StagingPrecondition
---
.../impl/subscriber/DefaultPrecondition.java | 2 +-
.../impl/subscriber/DistributionSubscriber.java | 6 +-
.../impl/subscriber/PackageStatusWatcher.java | 81 +++--------
.../journal/impl/subscriber/Precondition.java | 2 +-
.../impl/subscriber/StagingPrecondition.java | 76 ++++-------
.../impl/subscriber/SubscriberConfiguration.java | 2 +-
.../impl/subscriber/DefaultPreconditionTest.java} | 15 +-
.../impl/subscriber/PackageStatusWatcherTest.java | 33 +----
.../impl/subscriber/StagingPreconditionTest.java | 152 +++++++++++----------
.../journal/impl/subscriber/SubscriberTest.java | 14 +-
10 files changed, 154 insertions(+), 229 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
index 33b10c5..e335ffe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
@Component(immediate = true, service = Precondition.class, property = { "name=default" })
public class DefaultPrecondition implements Precondition {
@Override
- public boolean canProcess(long pkgOffset, int timeoutSeconds) {
+ public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) {
return true;
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index c0579c5..880a304 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -398,7 +398,7 @@ public class DistributionSubscriber implements DistributionAgent {
}
}
- private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException {
+ private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, IllegalStateException, InterruptedException {
long offset = queueItem.get(RECORD_OFFSET, Long.class);
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
boolean skip = shouldSkip(offset);
@@ -413,8 +413,8 @@ public class DistributionSubscriber implements DistributionAgent {
distributionMetricsService.getItemsBufferSize().decrement();
}
- private boolean shouldSkip(long offset) throws IllegalStateException {
- return commandPoller.isCleared(offset) || !precondition.canProcess(offset, PRECONDITION_TIMEOUT);
+ private boolean shouldSkip(long offset) throws IllegalStateException, InterruptedException {
+ return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
index 81063e3..1b3cc32 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
@@ -19,33 +19,29 @@
package org.apache.sling.distribution.journal.impl.subscriber;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Map;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
public class PackageStatusWatcher implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(PackageStatusWatcher.class);
-
private final Closeable poller;
- private final String subAgentName;
- private final NavigableMap<Long, FullMessage<PackageStatusMessage>> cache = new ConcurrentSkipListMap<>();
+
+ // subAgentName -> pkgOffset -> Status
+ private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new HashMap<>();
- public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics, String subAgentName) {
+ public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics) {
String topicName = topics.getStatusTopic();
- this.subAgentName = subAgentName;
poller = messagingProvider.createPoller(
topicName,
@@ -54,39 +50,18 @@ public class PackageStatusWatcher implements Closeable {
);
}
-
/**
* Gets the status that confirms the package at offset pkgOffset
* @param pkgOffset the offset of the package
* @return the status confirming the package; or null if it has not been confirmed yet
*/
- public PackageStatusMessage.Status getStatus(long pkgOffset) {
- FullMessage<PackageStatusMessage> msg = cache.get(pkgOffset);
-
- return msg != null ? msg.getMessage().getStatus() : null;
- }
-
- /**
- * Gets the status offset that confirms the packages at offset pkgOffset
- * @param pkgOffset the offset of the package
- * @return the offset of the confirming package; or null if has not been confirmed yet
- */
- public Long getStatusOffset(long pkgOffset) {
- FullMessage<PackageStatusMessage> msg = cache.get(pkgOffset);
-
- return msg != null ? msg.getInfo().getOffset() : null;
+ public PackageStatusMessage.Status getStatus(String subAgentName, long pkgOffset) {
+ Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
+ return statusPerAgent.get(pkgOffset);
}
- /**
- * Clear all offsets in the cache smaller to the given pkgOffset.
- * @param pkgOffset package offset
- */
- public void clear(long pkgOffset) {
- NavigableMap<Long, FullMessage<PackageStatusMessage>> removed = cache.headMap(pkgOffset, false);
- if (! removed.isEmpty()) {
- LOG.info("Remove package offsets {} from status cache", removed.keySet());
- }
- removed.clear();
+ private Map<Long, Status> getAgentStatus(String subAgentName) {
+ return pkgStatusPerSubAgent.computeIfAbsent(subAgentName, offset -> new HashMap<Long, Messages.PackageStatusMessage.Status>());
}
@Override
@@ -94,21 +69,9 @@ public class PackageStatusWatcher implements Closeable {
poller.close();
}
- public void handle(MessageInfo info, Messages.PackageStatusMessage msg) {
+ private void handle(MessageInfo info, Messages.PackageStatusMessage pkgStatusMsg) {
// TODO: check revision
-
- Long pkgOffset = msg.getOffset();
- FullMessage<PackageStatusMessage> message = new FullMessage<>(info, msg);
-
- // cache only messages that are from the given subAgentName
- if (!subAgentName.equals(msg.getSubAgentName())) {
- return;
- }
-
- if (cache.containsKey(pkgOffset)) {
- LOG.warn("Package offset {} already exists", pkgOffset);
- }
-
- cache.put(pkgOffset, message);
+ Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName());
+ agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus());
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
index 423fe7a..e41fda0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
@@ -30,6 +30,6 @@ public interface Precondition {
* @throws IllegalStateException if the timeout expired without being able to determine status
* @return true if the package can be processed; otherwise it returns false.
*/
- boolean canProcess(long pkgOffset, int timeoutSeconds);
+ boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
index 353f7aa..27ef46f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
@@ -18,18 +18,17 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.commons.io.IOUtils;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.metatype.annotations.AttributeDefinition;
-import org.osgi.service.metatype.annotations.Designate;
-import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,14 +36,15 @@ import org.slf4j.LoggerFactory;
* This is a precondition that watches status messages from other instances in order to confirm that a package can be processed.
* The check will block until a status is found. If no status is received in 60 seconds it will throw an exception.
*/
-@Component(immediate = true, service = Precondition.class,
- configurationPolicy = ConfigurationPolicy.REQUIRE)
-@Designate(ocd = StagingPrecondition.Configuration.class, factory = true)
-public class StagingPrecondition implements Precondition {
+@Component(
+ property = {
+ PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+ PROPERTY_SCHEDULER_PERIOD + ":Long=" + 24 * 60 * 60, // 1 day
+ })
+public class StagingPrecondition implements Precondition, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class);
-
@Reference
private MessagingProvider messagingProvider;
@@ -54,13 +54,11 @@ public class StagingPrecondition implements Precondition {
private volatile PackageStatusWatcher watcher;
private volatile boolean running = true;
-
-
+
@Activate
- public void activate(Configuration config) {
- String subAgentName = config.subAgentName();
- watcher = new PackageStatusWatcher(messagingProvider, topics, subAgentName);
- LOG.info("Activated Staging Precondition for subAgentName {}", subAgentName);
+ public void activate() {
+ watcher = new PackageStatusWatcher(messagingProvider, topics);
+ LOG.info("Activated Staging Precondition");
}
@Deactivate
@@ -69,50 +67,34 @@ public class StagingPrecondition implements Precondition {
running = false;
}
-
@Override
- public boolean canProcess(long pkgOffset, int timeoutSeconds) {
-
+ public synchronized boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException {
if (timeoutSeconds < 1) {
throw new IllegalArgumentException();
}
- // clear all offsets less than the required one as they are not needed anymore.
- // this works OK only if pkgOffset are always queried in increasing order.
- watcher.clear(pkgOffset);
-
// try to get the status for timeoutSeconds and then throw
- for(int i=0; running && i < timeoutSeconds; i++) {
- Status status = watcher.getStatus(pkgOffset);
+ for(int i=0; i < timeoutSeconds * 10; i++) {
+ Status status = watcher.getStatus(subAgentName, pkgOffset);
if (status != null) {
return status == Status.IMPORTED;
} else {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Precondition evaluation has been interrupted");
- }
+ Thread.sleep(100);
+ }
+
+ if (!running) {
+ throw new InterruptedException("Staging precondition is shutting down");
}
}
throw new IllegalStateException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
}
-
-
- @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Staged Distribution Precondition",
- description = "Apache Sling Content Distribution Sub Agent precondition for staged distribution")
- public @interface Configuration {
-
- @AttributeDefinition(name = "Precondition name", description = "The name of the staging precondition")
- String name() default "staging";
-
- @AttributeDefinition(name = "Subscriber agent name", description = "The name of the subscriber agent to watch")
- String subAgentName() default "";
-
- @AttributeDefinition
- String webconsole_configurationFactory_nameHint() default "Precondition name: {name}";
-
+
+ public synchronized void run() {
+ LOG.info("Purging StagingPrecondition cache");
+ IOUtils.closeQuietly(watcher);
+ watcher = new PackageStatusWatcher(messagingProvider, topics);
}
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 41c9e58..937c30a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -37,7 +37,7 @@ public @interface SubscriberConfiguration {
@AttributeDefinition(name = "DistributionPackageBuilder target",
description = "The target reference for the DistributionPackageBuilder used to build/install content packages, e.g. use target=(name=...) to bind to a service by name.")
- String packageBuilder_target() default "(name=...)";
+ String packageBuilder_target() default "(name=journal_filevault)";
@AttributeDefinition(name = "Precondition target",
description = "The target reference for the Precondition used to validate packages, e.g. use target=(name=...) to bind to a service by name.")
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
similarity index 72%
copy from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
copy to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
index 33b10c5..da2ce66 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
@@ -18,12 +18,15 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import org.osgi.service.component.annotations.Component;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
-@Component(immediate = true, service = Precondition.class, property = { "name=default" })
-public class DefaultPrecondition implements Precondition {
- @Override
- public boolean canProcess(long pkgOffset, int timeoutSeconds) {
- return true;
+import org.junit.Test;
+
+public class DefaultPreconditionTest {
+ @Test
+ public void testAlwaysTrue() {
+ boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10);
+ assertThat(canProcess, equalTo(true));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
index 32b5b7b..8ab19f3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
@@ -71,7 +71,7 @@ public class PackageStatusWatcherTest {
adapterCaptor.capture()))
.thenReturn(mock(Closeable.class));
- statusWatcher = new PackageStatusWatcher(provider, topics, SUB1_AGENT_NAME);
+ statusWatcher = new PackageStatusWatcher(provider, topics);
}
@@ -81,33 +81,8 @@ public class PackageStatusWatcherTest {
generateMessages(10, 50);
-
assertPackageStatus(1000, null);
assertPackageStatus(1010, Status.REMOVED_FAILED);
-
-
- statusWatcher.clear(1011);
-
- assertPackageStatus(1010, null);
- assertPackageStatus(1011, Status.REMOVED_FAILED);
- assertPackageStatus(1049, Status.REMOVED_FAILED);
- assertPackageStatus(1050, null);
-
-
- generateMessages(50, 60);
-
- assertPackageStatus(1011, Status.REMOVED_FAILED);
- assertPackageStatus(1050, Status.REMOVED_FAILED);
- assertPackageStatus(1060, null);
-
- statusWatcher.clear(1100);
-
- assertPackageStatus(1050, null);
-
-
-
-
-
}
@@ -134,11 +109,9 @@ public class PackageStatusWatcherTest {
void assertPackageStatus(long pkgOffset, Status status) {
if (status == null) {
- assertEquals(null, statusWatcher.getStatus(pkgOffset));
- assertEquals(null, statusWatcher.getStatusOffset(pkgOffset));
+ assertEquals(null, statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset));
} else {
- assertEquals(status, statusWatcher.getStatus(pkgOffset));
- assertEquals(pkgOffset-1000, (long) statusWatcher.getStatusOffset(pkgOffset));
+ assertEquals(status, statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
index 8eeeff0..b3d8cf3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
@@ -18,16 +18,25 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import com.google.common.collect.ImmutableMap;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
@@ -39,114 +48,109 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
-import org.osgi.util.converter.Converters;
-
-import java.io.Closeable;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class StagingPreconditionTest {
+ private static final String OTHER_AGENT = "other agent";
private static final String SUB1_SLING_ID = "sub1sling";
private static final String GP_SUB1_AGENT_NAME = "gpsub1agent";
private static final String PUB1_AGENT_NAME = "pub1agent";
+ private static final Long OFFSET_NOT_PRESENT = 111111l;
@Mock
- MessagingProvider clientProvider;
-
- @Spy
- Topics topics = new Topics();
-
+ private MessagingProvider clientProvider;
@Spy
- private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
-
+ private Topics topics = new Topics();
@Captor
private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statusCaptor;
@InjectMocks
- StagingPrecondition precondition;
+ private StagingPrecondition precondition;
private MessageHandler<PackageStatusMessage> statusHandler;
-
@Before
public void before() {
Awaitility.setDefaultPollDelay(Duration.ZERO);
Awaitility.setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
MockitoAnnotations.initMocks(this);
- StagingPrecondition.Configuration config = Converters.standardConverter()
- .convert(ImmutableMap.of("subAgentName", GP_SUB1_AGENT_NAME)).to(StagingPrecondition.Configuration.class);
when(clientProvider.createPoller(
Mockito.anyString(),
Mockito.eq(Reset.earliest),
statusCaptor.capture()))
.thenReturn(mock(Closeable.class));
- precondition.activate(config);
+ precondition.activate();
statusHandler = statusCaptor.getValue().getHandler();
}
-
- @Test
- public void testStatus() {
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1001, PackageStatusMessage.Status.REMOVED));
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1002, PackageStatusMessage.Status.IMPORTED));
-
- assertFalse(precondition.canProcess(1000, 1));
- assertFalse(precondition.canProcess(1001, 1));
- assertTrue(precondition.canProcess(1002, 1));
- assertTrue(precondition.canProcess(1002, 1));
- assertTrue(precondition.canProcess(1002,1 ));
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalTimeout() throws InterruptedException {
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
}
-
+
+ @Test(expected = IllegalStateException.class)
+ public void testNotYetProcessed() throws InterruptedException {
+ simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED);
+ boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1);
+ assertThat(res, equalTo(true));
+
+ // We got no package for this agent. So this should time out
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
+ }
+
@Test
- public void testClearCache() {
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1001, PackageStatusMessage.Status.REMOVED));
-
- assertFalse(precondition.canProcess(1000, 1));
- assertFalse(precondition.canProcess(1001,1 ));
- assertThrows(1000);
- statusHandler = statusCaptor.getValue().getHandler();
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
- assertFalse(precondition.canProcess(1000, 1));
+ public void testDeactivateDuringCanProcess() {
+ AtomicReference<Throwable> exHolder = new AtomicReference<>();
+ Thread th = new Thread(() -> {
+ try {
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2);
+ } catch (Throwable t) {
+ exHolder.set(t);
+ }
+ });
+ th.start();
+ precondition.deactivate();
+ Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
+ assertThat(ex, instanceOf(InterruptedException.class));
}
-
-
- void assertThrows(long offset) {
- try {
- precondition.canProcess(offset,1 );
- fail("it must throw");
- } catch (IllegalStateException e) {
-
- }
+
+ @Test(expected = IllegalStateException.class)
+ public void testCleanup() throws InterruptedException {
+ simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
+ assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+
+ // Cleanup
+ precondition.run();
+
+ // Should time out because after cleanup message is not present anymore
+ precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+ }
+
+ @Test
+ public void testStatus() throws InterruptedException {
+ simulateMessage(GP_SUB1_AGENT_NAME, 1000, PackageStatusMessage.Status.REMOVED_FAILED);
+ simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED);
+ simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
+
+ assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
+ assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
+ assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
}
- PackageStatusMessage createMessage(long offset, PackageStatusMessage.Status status) {
- return PackageStatusMessage.newBuilder()
+ private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) {
+ PackageStatusMessage message = PackageStatusMessage.newBuilder()
.setSubSlingId(SUB1_SLING_ID)
- .setSubAgentName(GP_SUB1_AGENT_NAME)
+ .setSubAgentName(subAgentName)
.setPubAgentName(PUB1_AGENT_NAME)
- .setOffset(offset)
+ .setOffset(pkgOffset)
.setStatus(status)
.build();
+
+ TestMessageInfo offset0 = new TestMessageInfo("", 1, 0, 0);
+ statusHandler.handle(offset0, message);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index dc8263a..fcdacd9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -325,7 +325,7 @@ public class SubscriberTest {
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
- when(precondition.canProcess(eq(11), anyInt())).thenReturn(false);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false);
try {
waitSubscriber(IDLE);
@@ -334,13 +334,13 @@ public class SubscriberTest {
}
- when(precondition.canProcess(eq(11), anyInt())).thenReturn(true);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true);
waitSubscriber(IDLE);
}
@Test
- public void testReadyWhenWatingForPrecondition() {
+ public void testReadyWhenWatingForPrecondition() throws InterruptedException {
Semaphore sem = new Semaphore(0);
assumeWaitingForPrecondition(sem);
initSubscriber();
@@ -402,12 +402,12 @@ public class SubscriberTest {
.thenReturn(timer);
}
- private void assumeNoPrecondition() {
- when(precondition.canProcess(anyLong(), anyInt())).thenReturn(true);
+ private void assumeNoPrecondition() throws InterruptedException {
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true);
}
- private void assumeWaitingForPrecondition(Semaphore sem) {
- when(precondition.canProcess(anyLong(), anyInt()))
+ private void assumeWaitingForPrecondition(Semaphore sem) throws InterruptedException {
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt()))
.thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
}