You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/02/16 10:24:54 UTC

[sling-org-apache-sling-distribution-journal] 01/01: SLING-9075 - Support multiple agents with one StagingPrecondition

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9075
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit fb447a53a7c2d678520fb5a3c6ef654eccd10e6c
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Sun Feb 16 11:24:31 2020 +0100

    SLING-9075 - Support multiple agents with one StagingPrecondition
---
 .../impl/subscriber/DefaultPrecondition.java       |   2 +-
 .../impl/subscriber/DistributionSubscriber.java    |   6 +-
 .../impl/subscriber/PackageStatusWatcher.java      |  81 +++--------
 .../journal/impl/subscriber/Precondition.java      |   2 +-
 .../impl/subscriber/StagingPrecondition.java       |  76 ++++-------
 .../impl/subscriber/SubscriberConfiguration.java   |   2 +-
 .../impl/subscriber/DefaultPreconditionTest.java}  |  15 +-
 .../impl/subscriber/PackageStatusWatcherTest.java  |  33 +----
 .../impl/subscriber/StagingPreconditionTest.java   | 152 +++++++++++----------
 .../journal/impl/subscriber/SubscriberTest.java    |  14 +-
 10 files changed, 154 insertions(+), 229 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
index 33b10c5..e335ffe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
 @Component(immediate = true, service = Precondition.class, property = { "name=default" })
 public class DefaultPrecondition implements Precondition {
     @Override
-    public boolean canProcess(long pkgOffset, int timeoutSeconds) {
+    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) {
         return true;
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index c0579c5..880a304 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -398,7 +398,7 @@ public class DistributionSubscriber implements DistributionAgent {
         }
     }
 
-    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException {
+    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, IllegalStateException, InterruptedException {
         long offset = queueItem.get(RECORD_OFFSET, Long.class);
         PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
         boolean skip = shouldSkip(offset);
@@ -413,8 +413,8 @@ public class DistributionSubscriber implements DistributionAgent {
         distributionMetricsService.getItemsBufferSize().decrement();
     }
 
-    private boolean shouldSkip(long offset) throws IllegalStateException {
-        return commandPoller.isCleared(offset) || !precondition.canProcess(offset, PRECONDITION_TIMEOUT);
+    private boolean shouldSkip(long offset) throws IllegalStateException, InterruptedException {
+        return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
index 81063e3..1b3cc32 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.java
@@ -19,33 +19,29 @@
 package org.apache.sling.distribution.journal.impl.subscriber;
 
 
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.FullMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Map;
 
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 
 public class PackageStatusWatcher implements Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(PackageStatusWatcher.class);
-
     private final Closeable poller;
-    private final String subAgentName;
-    private final NavigableMap<Long, FullMessage<PackageStatusMessage>> cache = new ConcurrentSkipListMap<>();
+    
+    // subAgentName -> pkgOffset -> Status
+    private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new HashMap<>();
 
-    public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics, String subAgentName) {
+    public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics) {
         String topicName = topics.getStatusTopic();
-        this.subAgentName = subAgentName;
 
         poller = messagingProvider.createPoller(
                 topicName,
@@ -54,39 +50,18 @@ public class PackageStatusWatcher implements Closeable {
         );
     }
 
-
     /**
      * Gets the status that confirms the package at offset pkgOffset
      * @param pkgOffset the offset of the package
      * @return the status confirming the package; or null if it has not been confirmed yet
      */
-    public PackageStatusMessage.Status getStatus(long pkgOffset) {
-        FullMessage<PackageStatusMessage> msg = cache.get(pkgOffset);
-
-        return msg != null ? msg.getMessage().getStatus() : null;
-    }
-
-    /**
-     * Gets the status offset that confirms the packages at offset pkgOffset
-     * @param pkgOffset the offset of the package
-     * @return the offset of the confirming package; or null if has not been confirmed yet
-     */
-    public Long getStatusOffset(long pkgOffset) {
-        FullMessage<PackageStatusMessage> msg = cache.get(pkgOffset);
-
-        return msg != null ? msg.getInfo().getOffset() : null;
+    public PackageStatusMessage.Status getStatus(String subAgentName, long pkgOffset) {
+        Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
+        return statusPerAgent.get(pkgOffset);
     }
 
-    /**
-     * Clear all offsets in the cache smaller to the given pkgOffset.
-     * @param pkgOffset package offset
-     */
-    public void clear(long pkgOffset) {
-        NavigableMap<Long, FullMessage<PackageStatusMessage>> removed = cache.headMap(pkgOffset, false);
-        if (! removed.isEmpty()) {
-            LOG.info("Remove package offsets {} from status cache", removed.keySet());
-        }
-        removed.clear();
+    private Map<Long, Status> getAgentStatus(String subAgentName) {
+        return pkgStatusPerSubAgent.computeIfAbsent(subAgentName, offset -> new HashMap<Long, Messages.PackageStatusMessage.Status>());
     }
 
     @Override
@@ -94,21 +69,9 @@ public class PackageStatusWatcher implements Closeable {
         poller.close();
     }
 
-    public void handle(MessageInfo info, Messages.PackageStatusMessage msg) {
+    private void handle(MessageInfo info, Messages.PackageStatusMessage pkgStatusMsg) {
         // TODO: check revision
-
-        Long pkgOffset = msg.getOffset();
-        FullMessage<PackageStatusMessage> message = new FullMessage<>(info, msg);
-
-        // cache only messages that are from the given subAgentName
-        if (!subAgentName.equals(msg.getSubAgentName())) {
-            return;
-        }
-
-        if (cache.containsKey(pkgOffset)) {
-            LOG.warn("Package offset {} already exists", pkgOffset);
-        }
-
-        cache.put(pkgOffset, message);
+        Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName());
+        agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus());
     }
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
index 423fe7a..e41fda0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Precondition.java
@@ -30,6 +30,6 @@ public interface Precondition {
      * @throws IllegalStateException if the timeout expired without being able to determine status
      * @return true if the package can be processed; otherwise it returns false.
      */
-    boolean canProcess(long pkgOffset, int timeoutSeconds);
+    boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException;
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
index 353f7aa..27ef46f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPrecondition.java
@@ -18,18 +18,17 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.commons.io.IOUtils;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.ConfigurationPolicy;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.metatype.annotations.AttributeDefinition;
-import org.osgi.service.metatype.annotations.Designate;
-import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,14 +36,15 @@ import org.slf4j.LoggerFactory;
  * This is a precondition that watches status messages from other instances in order to confirm that a package can be processed.
  * The check will block until a status is found. If no status is received in 60 seconds it will throw an exception.
  */
-@Component(immediate = true, service = Precondition.class,
-        configurationPolicy = ConfigurationPolicy.REQUIRE)
-@Designate(ocd = StagingPrecondition.Configuration.class, factory = true)
-public class StagingPrecondition implements Precondition {
+@Component(
+        property = {
+                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 24 * 60 * 60, // 1 day
+        })
+public class StagingPrecondition implements Precondition, Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class);
 
-
     @Reference
     private MessagingProvider messagingProvider;
 
@@ -54,13 +54,11 @@ public class StagingPrecondition implements Precondition {
     private volatile PackageStatusWatcher watcher;
 
     private volatile boolean running = true;
-
-
+    
     @Activate
-    public void activate(Configuration config) {
-        String subAgentName = config.subAgentName();
-        watcher = new PackageStatusWatcher(messagingProvider, topics, subAgentName);
-        LOG.info("Activated Staging Precondition for subAgentName {}", subAgentName);
+    public void activate() {
+        watcher = new PackageStatusWatcher(messagingProvider, topics);
+        LOG.info("Activated Staging Precondition");
     }
 
     @Deactivate
@@ -69,50 +67,34 @@ public class StagingPrecondition implements Precondition {
         running = false;
     }
 
-
     @Override
-    public boolean canProcess(long pkgOffset, int timeoutSeconds) {
-
+    public synchronized boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException {
         if (timeoutSeconds < 1) {
             throw new IllegalArgumentException();
         }
 
-        // clear all offsets less than the required one as they are not needed anymore.
-        // this works OK only if pkgOffset are always queried in increasing order.
-        watcher.clear(pkgOffset);
-
         // try to get the status for timeoutSeconds and then throw
-        for(int i=0; running && i < timeoutSeconds; i++) {
-            Status status = watcher.getStatus(pkgOffset);
+        for(int i=0; i < timeoutSeconds * 10; i++) {
+            Status status = watcher.getStatus(subAgentName, pkgOffset);
 
             if (status != null) {
                 return status == Status.IMPORTED;
             } else {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new IllegalStateException("Precondition evaluation has been interrupted");
-                }
+                Thread.sleep(100);
+            }
+            
+            if (!running) {
+                throw new InterruptedException("Staging precondition is shutting down");
             }
         }
 
         throw new IllegalStateException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
     }
-
-
-    @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Staged Distribution Precondition",
-            description = "Apache Sling Content Distribution Sub Agent precondition for staged distribution")
-    public @interface Configuration {
-
-        @AttributeDefinition(name = "Precondition name", description = "The name of the staging precondition")
-        String name() default "staging";
-
-        @AttributeDefinition(name = "Subscriber agent name", description = "The name of the subscriber agent to watch")
-        String subAgentName() default "";
-
-        @AttributeDefinition
-        String webconsole_configurationFactory_nameHint() default "Precondition name: {name}";
-
+    
+    public synchronized void run() {
+        LOG.info("Purging StagingPrecondition cache");
+        IOUtils.closeQuietly(watcher);
+        watcher = new PackageStatusWatcher(messagingProvider, topics);
     }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 41c9e58..937c30a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -37,7 +37,7 @@ public @interface SubscriberConfiguration {
 
     @AttributeDefinition(name = "DistributionPackageBuilder target",
             description = "The target reference for the DistributionPackageBuilder used to build/install content packages, e.g. use target=(name=...) to bind to a service by name.")
-    String packageBuilder_target() default "(name=...)";
+    String packageBuilder_target() default "(name=journal_filevault)";
 
     @AttributeDefinition(name = "Precondition target",
             description = "The target reference for the Precondition used to validate packages, e.g. use target=(name=...) to bind to a service by name.")
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
similarity index 72%
copy from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
copy to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
index 33b10c5..da2ce66 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPrecondition.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/DefaultPreconditionTest.java
@@ -18,12 +18,15 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import org.osgi.service.component.annotations.Component;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
 
-@Component(immediate = true, service = Precondition.class, property = { "name=default" })
-public class DefaultPrecondition implements Precondition {
-    @Override
-    public boolean canProcess(long pkgOffset, int timeoutSeconds) {
-        return true;
+import org.junit.Test;
+
+public class DefaultPreconditionTest {
+    @Test
+    public void testAlwaysTrue() {
+        boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10);
+        assertThat(canProcess, equalTo(true));
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
index 32b5b7b..8ab19f3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcherTest.java
@@ -71,7 +71,7 @@ public class PackageStatusWatcherTest {
                 adapterCaptor.capture()))
                 .thenReturn(mock(Closeable.class));
 
-        statusWatcher = new PackageStatusWatcher(provider, topics, SUB1_AGENT_NAME);
+        statusWatcher = new PackageStatusWatcher(provider, topics);
 
     }
 
@@ -81,33 +81,8 @@ public class PackageStatusWatcherTest {
 
         generateMessages(10, 50);
 
-
         assertPackageStatus(1000, null);
         assertPackageStatus(1010, Status.REMOVED_FAILED);
-
-
-        statusWatcher.clear(1011);
-
-        assertPackageStatus(1010, null);
-        assertPackageStatus(1011, Status.REMOVED_FAILED);
-        assertPackageStatus(1049, Status.REMOVED_FAILED);
-        assertPackageStatus(1050, null);
-
-
-        generateMessages(50, 60);
-
-        assertPackageStatus(1011, Status.REMOVED_FAILED);
-        assertPackageStatus(1050, Status.REMOVED_FAILED);
-        assertPackageStatus(1060, null);
-
-        statusWatcher.clear(1100);
-
-        assertPackageStatus(1050, null);
-
-
-
-
-
     }
 
 
@@ -134,11 +109,9 @@ public class PackageStatusWatcherTest {
 
     void assertPackageStatus(long pkgOffset, Status status) {
         if (status == null) {
-            assertEquals(null, statusWatcher.getStatus(pkgOffset));
-            assertEquals(null, statusWatcher.getStatusOffset(pkgOffset));
+            assertEquals(null, statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset));
         } else {
-            assertEquals(status, statusWatcher.getStatus(pkgOffset));
-            assertEquals(pkgOffset-1000, (long) statusWatcher.getStatusOffset(pkgOffset));
+            assertEquals(status, statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset));
         }
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
index 8eeeff0..b3d8cf3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/StagingPreconditionTest.java
@@ -18,16 +18,25 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import com.google.common.collect.ImmutableMap;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
@@ -39,114 +48,109 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
-import org.osgi.util.converter.Converters;
-
-import java.io.Closeable;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class StagingPreconditionTest {
 
+    private static final String OTHER_AGENT = "other agent";
     private static final String SUB1_SLING_ID = "sub1sling";
     private static final String GP_SUB1_AGENT_NAME = "gpsub1agent";
     private static final String PUB1_AGENT_NAME = "pub1agent";
+    private static final Long OFFSET_NOT_PRESENT = 111111l;
 
     @Mock
-    MessagingProvider clientProvider;
-
-    @Spy
-    Topics topics = new Topics();
-
+    private MessagingProvider clientProvider;
 
     @Spy
-    private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
-
+    private Topics topics = new Topics();
 
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statusCaptor;
 
     @InjectMocks
-    StagingPrecondition precondition;
+    private StagingPrecondition precondition;
 
     private MessageHandler<PackageStatusMessage> statusHandler;
 
-
     @Before
     public void before() {
         Awaitility.setDefaultPollDelay(Duration.ZERO);
         Awaitility.setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
         MockitoAnnotations.initMocks(this);
 
-        StagingPrecondition.Configuration config = Converters.standardConverter()
-                .convert(ImmutableMap.of("subAgentName", GP_SUB1_AGENT_NAME)).to(StagingPrecondition.Configuration.class);
         when(clientProvider.createPoller(
                 Mockito.anyString(),
                 Mockito.eq(Reset.earliest),
                 statusCaptor.capture()))
                 .thenReturn(mock(Closeable.class));
 
-        precondition.activate(config);
+        precondition.activate();
         statusHandler = statusCaptor.getValue().getHandler();
     }
-
-    @Test
-    public void testStatus() {
-        statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
-                createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
-        statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
-                createMessage(1001, PackageStatusMessage.Status.REMOVED));
-
-        statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
-                createMessage(1002, PackageStatusMessage.Status.IMPORTED));
-
-        assertFalse(precondition.canProcess(1000, 1));
-        assertFalse(precondition.canProcess(1001, 1));
-        assertTrue(precondition.canProcess(1002, 1));
-        assertTrue(precondition.canProcess(1002, 1));
-        assertTrue(precondition.canProcess(1002,1 ));
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void testIllegalTimeout() throws InterruptedException {
+        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
     }
-
+    
+    @Test(expected = IllegalStateException.class)
+    public void testNotYetProcessed() throws InterruptedException {
+        simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED);
+        boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1);
+        assertThat(res, equalTo(true));
+
+        // We got no package for this agent. So this should time out
+        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
+    }
+    
     @Test
-    public void testClearCache() {
-        statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
-                createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
-        statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
-                createMessage(1001, PackageStatusMessage.Status.REMOVED));
-
-        assertFalse(precondition.canProcess(1000, 1));
-        assertFalse(precondition.canProcess(1001,1 ));
-        assertThrows(1000);
-        statusHandler = statusCaptor.getValue().getHandler();
-
-        statusHandler.handle(new TestMessageInfo("", 1, 0, 0),
-                createMessage(1000, PackageStatusMessage.Status.REMOVED_FAILED));
-
-        assertFalse(precondition.canProcess(1000, 1));
+    public void testDeactivateDuringCanProcess() {
+        AtomicReference<Throwable> exHolder = new AtomicReference<>();
+        Thread th = new Thread(() -> {
+            try {
+                precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2);
+            } catch (Throwable t) {
+                exHolder.set(t);
+            }
+        });
+        th.start();
+        precondition.deactivate();
+        Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
+        assertThat(ex, instanceOf(InterruptedException.class));
     }
-
-
-    void assertThrows(long offset) {
-        try {
-            precondition.canProcess(offset,1 );
-            fail("it must throw");
-        } catch (IllegalStateException e) {
-
-        }
+    
+    @Test(expected = IllegalStateException.class)
+    public void testCleanup() throws InterruptedException {
+        simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
+        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+        
+        // Cleanup
+        precondition.run();
+        
+        // Should time out because after cleanup message is not present anymore
+        precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+    }
+    
+    @Test
+    public void testStatus() throws InterruptedException {
+        simulateMessage(GP_SUB1_AGENT_NAME, 1000, PackageStatusMessage.Status.REMOVED_FAILED);
+        simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED);
+        simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
+
+        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
+        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
+        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
     }
 
-    PackageStatusMessage createMessage(long offset, PackageStatusMessage.Status status) {
-        return PackageStatusMessage.newBuilder()
+    private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) {
+        PackageStatusMessage message = PackageStatusMessage.newBuilder()
                 .setSubSlingId(SUB1_SLING_ID)
-                .setSubAgentName(GP_SUB1_AGENT_NAME)
+                .setSubAgentName(subAgentName)
                 .setPubAgentName(PUB1_AGENT_NAME)
-                .setOffset(offset)
+                .setOffset(pkgOffset)
                 .setStatus(status)
                 .build();
+        
+        TestMessageInfo offset0 = new TestMessageInfo("", 1, 0, 0);
+        statusHandler.handle(offset0, message);
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index dc8263a..fcdacd9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -325,7 +325,7 @@ public class SubscriberTest {
 
         packageHandler.handle(info, message);
         waitSubscriber(RUNNING);
-        when(precondition.canProcess(eq(11), anyInt())).thenReturn(false);
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false);
 
         try {
             waitSubscriber(IDLE);
@@ -334,13 +334,13 @@ public class SubscriberTest {
 
         }
 
-        when(precondition.canProcess(eq(11), anyInt())).thenReturn(true);
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true);
         waitSubscriber(IDLE);
 
     }
     
     @Test
-    public void testReadyWhenWatingForPrecondition() {
+    public void testReadyWhenWatingForPrecondition() throws InterruptedException {
         Semaphore sem = new Semaphore(0);
         assumeWaitingForPrecondition(sem);
         initSubscriber();
@@ -402,12 +402,12 @@ public class SubscriberTest {
                 .thenReturn(timer);
     }
 
-    private void assumeNoPrecondition() {
-        when(precondition.canProcess(anyLong(), anyInt())).thenReturn(true);
+    private void assumeNoPrecondition() throws InterruptedException {
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true);
     }
 
-    private void assumeWaitingForPrecondition(Semaphore sem) {
-        when(precondition.canProcess(anyLong(), anyInt()))
+    private void assumeWaitingForPrecondition(Semaphore sem) throws InterruptedException {
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt()))
             .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
     }