You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/02/18 13:42:57 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9075 - Support multiple agents with one StagingPrecondition
(#24)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 070ab61 SLING-9075 - Support multiple agents with one StagingPrecondition (#24)
070ab61 is described below
commit 070ab61bf94f8a59fd09036b121c6d3bce0b6052
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Tue Feb 18 14:42:49 2020 +0100
SLING-9075 - Support multiple agents with one StagingPrecondition (#24)
* SLING-9075 - Support multiple agents with one StagingPrecondition
---
.../impl/subscriber/DefaultPrecondition.java | 2 +-
.../impl/subscriber/DistributionSubscriber.java | 9 +-
.../impl/subscriber/PackageStatusWatcher.java | 83 ++++-------
.../journal/impl/subscriber/Precondition.java | 7 +-
.../impl/subscriber/StagingPrecondition.java | 85 +++++-------
.../impl/subscriber/SubscriberConfiguration.java | 2 +-
.../impl/subscriber/DefaultPreconditionTest.java} | 15 +-
.../impl/subscriber/PackageStatusWatcherTest.java | 33 +----
.../impl/subscriber/StagingPreconditionTest.java | 153 +++++++++++----------
.../journal/impl/subscriber/SubscriberTest.java | 29 ++--
10 files changed, 184 insertions(+), 234 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
index 33b10c5..e335ffe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
@Component(immediate = true, service = Precondition.class, property = { "name=default" })
public class DefaultPrecondition implements Precondition {
@Override
- public boolean canProcess(long pkgOffset, int timeoutSeconds) {
+ public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) {
return true;
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index c0579c5..ce215cf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
@@ -372,7 +373,7 @@ public class DistributionSubscriber implements DistributionAgent {
subscriberIdle.idle();
}
- } catch (IllegalStateException e) {
+ } catch (TimeoutException e) {
/**
* Precondition timed out. We only log this on info level as it is no error
*/
@@ -398,7 +399,7 @@ public class DistributionSubscriber implements DistributionAgent {
}
}
- private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException {
+ private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
long offset = queueItem.get(RECORD_OFFSET, Long.class);
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
boolean skip = shouldSkip(offset);
@@ -413,8 +414,8 @@ public class DistributionSubscriber implements DistributionAgent {
distributionMetricsService.getItemsBufferSize().decrement();
}
- private boolean shouldSkip(long offset) throws IllegalStateException {
- return commandPoller.isCleared(offset) || !precondition.canProcess(offset, PRECONDITION_TIMEOUT);
+ private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
+ return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
index 81063e3..9b64177 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
@@ -19,33 +19,29 @@
package org.apache.sling.distribution.journal.impl.subscriber;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
public class PackageStatusWatcher implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(PackageStatusWatcher.class);
-
private final Closeable poller;
- private final String subAgentName;
- private final NavigableMap<Long, FullMessage<PackageStatusMessage>> cache = new ConcurrentSkipListMap<>();
+
+ // subAgentName -> pkgOffset -> Status
+ private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new ConcurrentHashMap<>();
- public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics, String subAgentName) {
+ public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics) {
String topicName = topics.getStatusTopic();
- this.subAgentName = subAgentName;
poller = messagingProvider.createPoller(
topicName,
@@ -54,39 +50,22 @@ public class PackageStatusWatcher implements Closeable {
);
}
-
/**
* Gets the status that confirms the package at offset pkgOffset
* @param pkgOffset the offset of the package
* @return the status confirming the package; or null if it has not been confirmed yet
*/
- public PackageStatusMessage.Status getStatus(long pkgOffset) {
- FullMessage<PackageStatusMessage> msg = cache.get(pkgOffset);
-
- return msg != null ? msg.getMessage().getStatus() : null;
+ public PackageStatusMessage.Status getStatus(String subAgentName, long pkgOffset) {
+ Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
+ return statusPerAgent.get(pkgOffset);
}
- /**
- * Gets the status offset that confirms the packages at offset pkgOffset
- * @param pkgOffset the offset of the package
- * @return the offset of the confirming package; or null if has not been confirmed yet
- */
- public Long getStatusOffset(long pkgOffset) {
- FullMessage<PackageStatusMessage> msg = cache.get(pkgOffset);
-
- return msg != null ? msg.getInfo().getOffset() : null;
+ private Map<Long, Status> getAgentStatus(String subAgentName) {
+ return pkgStatusPerSubAgent.computeIfAbsent(subAgentName, this::newMap);
}
-
- /**
- * Clear all offsets in the cache smaller to the given pkgOffset.
- * @param pkgOffset package offset
- */
- public void clear(long pkgOffset) {
- NavigableMap<Long, FullMessage<PackageStatusMessage>> removed = cache.headMap(pkgOffset, false);
- if (! removed.isEmpty()) {
- LOG.info("Remove package offsets {} from status cache", removed.keySet());
- }
- removed.clear();
+
+ private Map<Long, Status> newMap(String subAgentName) {
+ return new ConcurrentHashMap<>();
}
@Override
@@ -94,21 +73,9 @@ public class PackageStatusWatcher implements Closeable {
poller.close();
}
- public void handle(MessageInfo info, Messages.PackageStatusMessage msg) {
+ private void handle(MessageInfo info, Messages.PackageStatusMessage pkgStatusMsg) {
// TODO: check revision
-
- Long pkgOffset = msg.getOffset();
- FullMessage<PackageStatusMessage> message = new FullMessage<>(info, msg);
-
- // cache only messages that are from the given subAgentName
- if (!subAgentName.equals(msg.getSubAgentName())) {
- return;
- }
-
- if (cache.containsKey(pkgOffset)) {
- LOG.warn("Package offset {} already exists", pkgOffset);
- }
-
- cache.put(pkgOffset, message);
+ Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName());
+ agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus());
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
index 423fe7a..c219466 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
+import java.util.concurrent.TimeoutException;
+
/**
* Extension point for checking if a package can be processed by a subscriber.
*/
@@ -27,9 +29,10 @@ public interface Precondition {
* Checks if a package can be processed
* @param pkgOffset the offset of the package
* @param timeoutSeconds max seconds to wait until returning
- * @throws IllegalStateException if the timeout expired without being able to determine status
+ * @throws TimeoutException if the timeout expired without being able to determine status
+ * @throws InterruptedException if the thread was interrupted and should shut down
* @return true if the package can be processed; otherwise it returns false.
*/
- boolean canProcess(long pkgOffset, int timeoutSeconds);
+ boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
index 353f7aa..ef1c595 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
@@ -18,18 +18,19 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.commons.io.IOUtils;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.metatype.annotations.AttributeDefinition;
-import org.osgi.service.metatype.annotations.Designate;
-import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,14 +38,16 @@ import org.slf4j.LoggerFactory;
* This is a precondition that watches status messages from other instances in order to confirm that a package can be processed.
* The check will block until a status is found. If no status is received in 60 seconds it will throw an exception.
*/
-@Component(immediate = true, service = Precondition.class,
- configurationPolicy = ConfigurationPolicy.REQUIRE)
-@Designate(ocd = StagingPrecondition.Configuration.class, factory = true)
-public class StagingPrecondition implements Precondition {
+@Component(
+ property = {
+ "name=staging",
+ PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+ PROPERTY_SCHEDULER_PERIOD + ":Long=" + 24 * 60 * 60, // 1 day
+ })
+public class StagingPrecondition implements Precondition, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class);
-
@Reference
private MessagingProvider messagingProvider;
@@ -54,65 +57,51 @@ public class StagingPrecondition implements Precondition {
private volatile PackageStatusWatcher watcher;
private volatile boolean running = true;
-
-
+
@Activate
- public void activate(Configuration config) {
- String subAgentName = config.subAgentName();
- watcher = new PackageStatusWatcher(messagingProvider, topics, subAgentName);
- LOG.info("Activated Staging Precondition for subAgentName {}", subAgentName);
+ public void activate() {
+ watcher = new PackageStatusWatcher(messagingProvider, topics);
+ LOG.info("Activated Staging Precondition");
}
@Deactivate
- public void deactivate() {
+ public synchronized void deactivate() {
IOUtils.closeQuietly(watcher);
running = false;
}
-
@Override
- public boolean canProcess(long pkgOffset, int timeoutSeconds) {
-
+ public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
if (timeoutSeconds < 1) {
throw new IllegalArgumentException();
}
- // clear all offsets less than the required one as they are not needed anymore.
- // this works OK only if pkgOffset are always queried in increasing order.
- watcher.clear(pkgOffset);
-
// try to get the status for timeoutSeconds and then throw
- for(int i=0; running && i < timeoutSeconds; i++) {
- Status status = watcher.getStatus(pkgOffset);
+ for(int i=0; i < timeoutSeconds * 10; i++) {
+ Status status = getStatus(subAgentName, pkgOffset);
if (status != null) {
return status == Status.IMPORTED;
} else {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Precondition evaluation has been interrupted");
- }
+ Thread.sleep(100);
+ }
+
+ if (!running) {
+ throw new InterruptedException("Staging precondition is shutting down");
}
}
- throw new IllegalStateException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
+ throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
}
-
- @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Staged Distribution Precondition",
- description = "Apache Sling Content Distribution Sub Agent precondition for staged distribution")
- public @interface Configuration {
-
- @AttributeDefinition(name = "Precondition name", description = "The name of the staging precondition")
- String name() default "staging";
-
- @AttributeDefinition(name = "Subscriber agent name", description = "The name of the subscriber agent to watch")
- String subAgentName() default "";
-
- @AttributeDefinition
- String webconsole_configurationFactory_nameHint() default "Precondition name: {name}";
-
+ private synchronized Status getStatus(String subAgentName, long pkgOffset) {
+ return watcher.getStatus(subAgentName, pkgOffset);
}
+
+ public synchronized void run() {
+ LOG.info("Purging StagingPrecondition cache");
+ IOUtils.closeQuietly(watcher);
+ watcher = new PackageStatusWatcher(messagingProvider, topics);
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 41c9e58..937c30a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -37,7 +37,7 @@ public @interface SubscriberConfiguration {
@AttributeDefinition(name = "DistributionPackageBuilder target",
description = "The target reference for the DistributionPackageBuilder used to build/install content packages, e.g. use target=(name=...) to bind to a service by name.")
- String packageBuilder_target() default "(name=...)";
+ String packageBuilder_target() default "(name=journal_filevault)";
@AttributeDefinition(name = "Precondition target",
description = "The target reference for the Precondition used to validate packages, e.g. use target=(name=...) to bind to a service by name.")
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
similarity index 72%
copy from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
copy to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
index 33b10c5..da2ce66 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
@@ -18,12 +18,15 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import org.osgi.service.component.annotations.Component;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
-@Component(immediate = true, service = Precondition.class, property = { "name=default" })
-public class DefaultPrecondition implements Precondition {
- @Override
- public boolean canProcess(long pkgOffset, int timeoutSeconds) {
- return true;
+import org.junit.Test;
+
+public class DefaultPreconditionTest {
+ @Test
+ public void testAlwaysTrue() {
+ boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10);
+ assertThat(canProcess, equalTo(true));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
index 32b5b7b..8ab19f3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
@@ -71,7 +71,7 @@ public class PackageStatusWatcherTest {
adapterCaptor.capture()))
.thenReturn(mock(Closeable.class));
- statusWatcher = new PackageStatusWatcher(provider, topics, SUB1_AGENT_NAME);
+ statusWatcher = new PackageStatusWatcher(provider, topics);
}
@@ -81,33 +81,8 @@ public class PackageStatusWatcherTest {
generateMessages(10, 50);
-
assertPackageStatus(1000, null);
assertPackageStatus(1010, Status.REMOVED_FAILED);
-
-
- statusWatcher.clear(1011);
-
- assertPackageStatus(1010, null);
- assertPackageStatus(1011, Status.REMOVED_FAILED);
- assertPackageStatus(1049, Status.REMOVED_FAILED);
- assertPackageStatus(1050, null);
-
-
- generateMessages(50, 60);
-
- assertPackageStatus(1011, Status.REMOVED_FAILED);
- assertPackageStatus(1050, Status.REMOVED_FAILED);
- assertPackageStatus(1060, null);
-
- statusWatcher.clear(1100);
-
- assertPackageStatus(1050, null);
-
-
-
-
-
}
@@ -134,11 +109,9 @@ public class PackageStatusWatcherTest {
void assertPackageStatus(long pkgOffset, Status status) {
if (status == null) {
- assertEquals(null, statusWatcher.getStatus(pkgOffset));
- assertEquals(null, statusWatcher.getStatusOffset(pkgOffset));
+ assertEquals(null, statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset));
} else {
- assertEquals(status, statusWatcher.getStatus(pkgOffset));
- assertEquals(pkgOffset-1000, (long) statusWatcher.getStatusOffset(pkgOffset));
+ assertEquals(status, statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
index 8eeeff0..b050526 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
@@ -18,16 +18,26 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import com.google.common.collect.ImmutableMap;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
@@ -39,114 +49,109 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
-import org.osgi.util.converter.Converters;
-
-import java.io.Closeable;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class StagingPreconditionTest {
+ private static final String OTHER_AGENT = "other agent";
private static final String SUB1_SLING_ID = "sub1sling";
private static final String GP_SUB1_AGENT_NAME = "gpsub1agent";
private static final String PUB1_AGENT_NAME = "pub1agent";
+ private static final Long OFFSET_NOT_PRESENT = 111111l;
@Mock
- MessagingProvider clientProvider;
-
- @Spy
- Topics topics = new Topics();
-
+ private MessagingProvider clientProvider;
@Spy
- private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
-
+ private Topics topics = new Topics();
@Captor
private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statusCaptor;
@InjectMocks
- StagingPrecondition precondition;
+ private StagingPrecondition precondition;
private MessageHandler<PackageStatusMessage> statusHandler;
-
@Before
public void before() {
Awaitility.setDefaultPollDelay(Duration.ZERO);
Awaitility.setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
MockitoAnnotations.initMocks(this);
- StagingPrecondition.Configuration config = Converters.standardConverter()
- .convert(ImmutableMap.of("subAgentName", GP_SUB1_AGENT_NAME)).to(StagingPrecondition.Configuration.class);
when(clientProvider.createPoller(
Mockito.anyString(),
Mockito.eq(Reset.earliest),
statusCaptor.capture()))
.thenReturn(mock(Closeable.class));
- precondition.activate(config);
+ precondition.activate();
statusHandler = statusCaptor.getValue().getHandler();
}
-
- @Test
- public void testStatus() {
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1001, PackageStatusMessage.Status.REMOVED));
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1002, PackageStatusMessage.Status.IMPORTED));
-
- assertFalse(precondition.canProcess(1000, 1));
- assertFalse(precondition.canProcess(1001, 1));
- assertTrue(precondition.canProcess(1002, 1));
- assertTrue(precondition.canProcess(1002, 1));
- assertTrue(precondition.canProcess(1002,1 ));
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalTimeout() throws InterruptedException, TimeoutException {
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
}
-
+
+ @Test(expected = TimeoutException.class)
+ public void testNotYetProcessed() throws InterruptedException, TimeoutException {
+ simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED);
+ boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1);
+ assertThat(res, equalTo(true));
+
+ // We got no package for this agent. So this should time out
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
+ }
+
@Test
- public void testClearCache() {
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1001, PackageStatusMessage.Status.REMOVED));
-
- assertFalse(precondition.canProcess(1000, 1));
- assertFalse(precondition.canProcess(1001,1 ));
- assertThrows(1000);
- statusHandler = statusCaptor.getValue().getHandler();
-
- statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
- createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
- assertFalse(precondition.canProcess(1000, 1));
+ public void testDeactivateDuringCanProcess() {
+ AtomicReference<Throwable> exHolder = new AtomicReference<>();
+ Thread th = new Thread(() -> {
+ try {
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2);
+ } catch (Throwable t) {
+ exHolder.set(t);
+ }
+ });
+ th.start();
+ precondition.deactivate();
+ Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
+ assertThat(ex, instanceOf(InterruptedException.class));
}
-
-
- void assertThrows(long offset) {
- try {
- precondition.canProcess(offset,1 );
- fail("it must throw");
- } catch (IllegalStateException e) {
-
- }
+
+ @Test(expected = TimeoutException.class)
+ public void testCleanup() throws InterruptedException, TimeoutException {
+ simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
+ assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+
+ // Cleanup
+ precondition.run();
+
+ // Should time out because after cleanup message is not present anymore
+ precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+ }
+
+ @Test
+ public void testStatus() throws InterruptedException, TimeoutException {
+ simulateMessage(GP_SUB1_AGENT_NAME, 1000, PackageStatusMessage.Status.REMOVED_FAILED);
+ simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED);
+ simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
+
+ assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
+ assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
+ assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
}
- PackageStatusMessage createMessage(long offset, PackageStatusMessage.Status status) {
- return PackageStatusMessage.newBuilder()
+ private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) {
+ PackageStatusMessage message = PackageStatusMessage.newBuilder()
.setSubSlingId(SUB1_SLING_ID)
- .setSubAgentName(GP_SUB1_AGENT_NAME)
+ .setSubAgentName(subAgentName)
.setPubAgentName(PUB1_AGENT_NAME)
- .setOffset(offset)
+ .setOffset(pkgOffset)
.setStatus(status)
.build();
+
+ TestMessageInfo offset0 = new TestMessageInfo("", 1, 0, 0);
+ statusHandler.handle(offset0, message);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index dc8263a..ba8ea9a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
@@ -221,7 +222,7 @@ public class SubscriberTest {
}
@Test
- public void testReceive() throws DistributionException, InterruptedException {
+ public void testReceive() throws DistributionException {
assumeNoPrecondition();
initSubscriber();
@@ -254,7 +255,7 @@ public class SubscriberTest {
}
@Test
- public void testReceiveDelete() throws DistributionException, InterruptedException, LoginException, PersistenceException {
+ public void testReceiveDelete() throws DistributionException, LoginException, PersistenceException {
assumeNoPrecondition();
initSubscriber();
@@ -274,7 +275,7 @@ public class SubscriberTest {
}
@Test
- public void testExecuteNotSupported() throws InterruptedException, DistributionException {
+ public void testExecuteNotSupported() throws DistributionException {
assumeNoPrecondition();
initSubscriber();
@@ -285,7 +286,7 @@ public class SubscriberTest {
@Test
- public void testSendFailedStatus() throws DistributionException, InterruptedException {
+ public void testSendFailedStatus() throws DistributionException {
assumeNoPrecondition();
initSubscriber(ImmutableMap.of("maxRetries", "1"));
@@ -317,7 +318,7 @@ public class SubscriberTest {
}
@Test
- public void testSkipOnRemovedStatus() throws DistributionException, InterruptedException {
+ public void testSkipOnRemovedStatus() throws DistributionException, InterruptedException, TimeoutException {
assumeNoPrecondition();
initSubscriber();
MessageInfo info = new TestMessageInfo("", 1, 11, 0);
@@ -325,7 +326,7 @@ public class SubscriberTest {
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
- when(precondition.canProcess(eq(11), anyInt())).thenReturn(false);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false);
try {
waitSubscriber(IDLE);
@@ -334,7 +335,7 @@ public class SubscriberTest {
}
- when(precondition.canProcess(eq(11), anyInt())).thenReturn(true);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true);
waitSubscriber(IDLE);
}
@@ -403,12 +404,20 @@ public class SubscriberTest {
}
private void assumeNoPrecondition() {
- when(precondition.canProcess(anyLong(), anyInt())).thenReturn(true);
+ try {
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private void assumeWaitingForPrecondition(Semaphore sem) {
- when(precondition.canProcess(anyLong(), anyInt()))
- .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
+ try {
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt()))
+ .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private final class WaitFor implements Answer<DistributionPackageInfo> {