You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 12:51:16 UTC

[sling-org-apache-sling-distribution-journal] 06/06: Revert "SLING-9259 - Extract service subscriber code into separate package (#25)"

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 03d8900203f71ba55759a4523c79587d65c60062
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 14:41:28 2020 +0200

    Revert "SLING-9259 - Extract service subscriber code into separate package (#25)"
    
    This reverts commit 9f2175a3
---
 .../impl/precondition/DefaultPrecondition.java     |   4 +-
 .../impl/precondition/PackageStatusWatcher.java    |   2 +-
 .../journal/impl/precondition/Precondition.java    |   3 +-
 .../impl/precondition/StagingPrecondition.java     |  32 ++-
 .../journal/impl/publisher/DiscoveryService.java   |   2 +-
 .../impl/publisher/DistributionPublisher.java      |  15 +-
 .../impl/publisher/PackageDistributedNotifier.java |   2 +-
 .../journal/impl/publisher/PackageRepo.java        |   4 +-
 .../{shared => impl/queue/impl}/EntryUtil.java     |   2 +-
 .../queue/impl}/PackageRetries.java                |   4 +-
 .../journal/impl/queue/impl/PubErrQueue.java       |   2 -
 .../journal/impl/queue/impl/PubQueue.java          |   2 -
 .../journal/impl/queue/impl/PubQueueCache.java     |   8 +-
 .../impl/queue/impl/PubQueueCacheService.java      |   4 +-
 .../impl/queue/impl/PubQueueProviderImpl.java      |   2 +-
 .../queue/impl}/QueueEntryFactory.java             |   2 +-
 .../queue/impl/{PubErrQueue.java => SubQueue.java} | 103 +++++---
 .../journal/{ => impl}/shared/AgentState.java      |   2 +-
 .../{ => impl}/shared/DefaultDistributionLog.java  |   2 +-
 .../shared/DistributionMetricsService.java         | 114 ++++++++-
 .../{ => impl}/shared/ExponentialBackOff.java      |   2 +-
 .../journal/{ => impl}/shared/JMXRegistration.java |   2 +-
 .../{ => impl}/shared/JournalAvailableChecker.java |   4 +-
 .../shared/JournalAvailableServiceMarker.java      |   2 +-
 .../journal/{ => impl}/shared/LimitPoller.java     |   2 +-
 .../journal/{ => impl}/shared/PackageBrowser.java  |   2 +-
 .../{ => impl}/shared/PackageViewerPlugin.java     |   2 +-
 .../shared/SimpleDistributionResponse.java         |   2 +-
 .../journal/{ => impl}/shared/Topics.java          |   2 +-
 .../journal/impl/subscriber/Announcer.java         |   1 -
 .../{service => impl}/subscriber/BookKeeper.java   |  85 +++----
 .../journal/impl/subscriber/CommandPoller.java     |   2 +-
 .../subscriber/ContentPackageExtractor.java        |   4 +-
 .../impl/subscriber/DistributionSubscriber.java    | 269 ++++++++++++++-------
 .../{service => impl}/subscriber/LocalStore.java   |   4 +-
 .../subscriber/PackageHandler.java                 |  45 +---
 .../subscriber/PackageHandling.java                |   2 +-
 .../subscriber/PreConditionTimeoutException.java   |  28 ---
 .../impl/subscriber/SubscriberConfiguration.java   |   1 -
 .../subscriber/SubscriberIdle.java                 |   9 +-
 .../service/subscriber/BookKeeperFactory.java      |  90 -------
 .../service/subscriber/ImportedEventFactory.java   |  55 -----
 .../journal/service/subscriber/NoopMetric.java     |  99 --------
 .../service/subscriber/SubscriberMetrics.java      | 239 ------------------
 .../impl/precondition/DefaultPreconditionTest.java |   5 +-
 .../precondition/PackageStatusWatcherTest.java     |   4 +-
 .../impl/precondition/StagingPreconditionTest.java |  56 +++--
 .../impl/publisher/DiscoveryServiceTest.java       |   4 +-
 .../impl/publisher/DistPublisherJMXTest.java       |   3 +-
 .../impl/publisher/DistributionPublisherTest.java  |   4 +-
 .../publisher/PackageDistributedNotifierTest.java  |   2 +-
 .../journal/impl/publisher/PackageRepoTest.java    |   4 +-
 .../journal/impl/queue/QueueItemFactoryTest.java   |   2 +-
 .../journal/impl/queue/impl/EntryUtilTest.java     |   1 -
 .../impl/queue/impl/OffsetQueueImplJMXTest.java    |   2 +-
 .../queue/impl}/PackageRetriesTest.java            |   4 +-
 .../journal/impl/queue/impl/PubQueueCacheTest.java |   4 +-
 .../impl/queue/impl/PubQueueProviderTest.java      |   2 +-
 .../journal/impl/queue/impl/PubQueueTest.java      |   1 -
 .../journal/impl/queue/impl/RangePollerTest.java   |   2 +-
 .../journal/impl/queue/impl/SubQueueTest.java      |  72 ++++++
 .../shared/DistributionMetricsServiceTest.java     |  14 +-
 .../{ => impl}/shared/ExponentialBackoffTest.java  |   3 +-
 .../shared/JournalAvailableCheckerTest.java        |  11 +-
 .../journal/{ => impl}/shared/LimitPollerTest.java |   4 +-
 .../{ => impl}/shared/PackageBrowserTest.java      |   3 +-
 .../{ => impl}/shared/PackageViewerPluginTest.java |   5 +-
 .../shared/SimpleDistributionResponseTest.java     |   3 +-
 .../journal/{ => impl}/shared/TestMessageInfo.java |   2 +-
 .../journal/impl/subscriber/AnnouncerTest.java     |   2 -
 .../subscriber/BookKeeperTest.java                 |  46 +---
 .../journal/impl/subscriber/CommandPollerTest.java |   4 +-
 .../subscriber/ContentPackageExtractorTest.java    |   3 +-
 .../subscriber/LocalStoreTest.java                 |   3 +-
 .../subscriber/SubscriberIdleTest.java             |   3 +-
 .../journal/impl/subscriber/SubscriberTest.java    | 170 +++++++++----
 76 files changed, 760 insertions(+), 952 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
index 5144378..dceed02 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
 @Component(immediate = true, service = Precondition.class, property = { "name=default" })
 public class DefaultPrecondition implements Precondition {
     @Override
-    public Decision canProcess(String subAgentName, long pkgOffset) {
-        return Decision.ACCEPT;
+    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) {
+        return true;
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 506cf55..0408e0d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
 
 public class PackageStatusWatcher implements Closeable {
     private final Closeable poller;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index d934431..3730475 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -33,7 +33,6 @@ public interface Precondition {
      * @throws InterruptedException if the thread was interrupted and should shut down
      * @return true if the package can be processed; otherwise it returns false.
      */
-    Decision canProcess(String subAgentName, long pkgOffset);
+    boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
 
-    enum Decision { ACCEPT, SKIP, WAIT};
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index ad6be9c..2272888 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -21,10 +21,12 @@ package org.apache.sling.distribution.journal.impl.precondition;
 import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
 import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
 
+import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -54,6 +56,8 @@ public class StagingPrecondition implements Precondition, Runnable {
 
     private volatile PackageStatusWatcher watcher;
 
+    private volatile boolean running = true;
+    
     @Activate
     public void activate() {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
@@ -63,15 +67,31 @@ public class StagingPrecondition implements Precondition, Runnable {
     @Deactivate
     public synchronized void deactivate() {
         IOUtils.closeQuietly(watcher);
+        running = false;
     }
 
     @Override
-    public Decision canProcess(String subAgentName, long pkgOffset) {
-        Status status = getStatus(subAgentName, pkgOffset);
-        if (status == null) {
-            return Decision.WAIT;
+    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
+        if (timeoutSeconds < 1) {
+            throw new IllegalArgumentException();
+        }
+
+        // try to get the status for timeoutSeconds and then throw
+        for(int i=0; i < timeoutSeconds * 10; i++) {
+            Status status = getStatus(subAgentName, pkgOffset);
+
+            if (status != null) {
+                return status == Status.IMPORTED;
+            } else {
+                Thread.sleep(100);
+            }
+            
+            if (!running) {
+                throw new InterruptedException("Staging precondition is shutting down");
+            }
         }
-        return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP;
+
+        throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
     }
 
     private synchronized Status getStatus(String subAgentName, long pkgOffset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index c201a42..af052a3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -28,9 +28,9 @@ import java.util.Hashtable;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.commons.io.IOUtils;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index c1c75d0..55787dc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -20,12 +20,12 @@ package org.apache.sling.distribution.journal.impl.publisher;
 
 
 import static java.util.stream.StreamSupport.stream;
+import static org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.timed;
 import static java.util.Objects.requireNonNull;
 import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
 import static org.apache.sling.distribution.DistributionRequestType.ADD;
 import static org.apache.sling.distribution.DistributionRequestType.DELETE;
 import static org.apache.sling.distribution.DistributionRequestType.TEST;
-import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,6 +47,11 @@ import javax.management.NotCompliantMBeanException;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
@@ -68,13 +73,9 @@ import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
+
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.AgentState;
-import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index e3dace3..591a0cf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -29,8 +29,8 @@ import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.JsonMessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index 81a4fba..aee0c52 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -25,6 +25,8 @@ import javax.jcr.Node;
 import javax.jcr.Property;
 import javax.jcr.nodetype.NodeType;
 
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.jackrabbit.api.ReferenceBinary;
 import org.apache.jackrabbit.commons.JcrUtils;
 import org.apache.sling.api.resource.LoginException;
@@ -45,8 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
index ed2d461..529fc52 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
index 1466d85..23f9463 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +27,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
  * Holds package retries by agent name
  */
 @ParametersAreNonnullByDefault
-class PackageRetries {
+public class PackageRetries {
 
     // (pubAgentName x retries)
     private final Map<String, Integer> pubAgentNameToRetries = new ConcurrentHashMap<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index db27714..aaa7b1c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -26,8 +26,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
-import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 197d0b1..9bfd4fd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -40,8 +40,6 @@ import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
-import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index b794116..189513c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -41,6 +41,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
@@ -51,8 +53,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
@@ -176,9 +176,7 @@ public class PubQueueCache {
                 sender.send(topic, pkgMsg);
                 sleep(seedingDelayMs);
             } catch (MessagingException e) {
-                if (!(e.getCause() instanceof InterruptedException)) {
-                    LOG.warn(e.getMessage(), e);
-                }
+                LOG.warn(e.getMessage(), e);
                 sleep(seedingDelayMs * 10);
             }
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 5709493..c5eeb8c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -22,9 +22,9 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index a72a358..cc9e839 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -30,11 +30,11 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
index 58a0717..47a9f25 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
similarity index 50%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
copy to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
index db27714..5d34e6d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
@@ -18,46 +18,56 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
-import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueState;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
+import static org.apache.sling.distribution.queue.DistributionQueueState.BLOCKED;
+import static org.apache.sling.distribution.queue.DistributionQueueState.IDLE;
+import static org.apache.sling.distribution.queue.DistributionQueueState.RUNNING;
+import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
 
 @ParametersAreNonnullByDefault
-public class PubErrQueue implements DistributionQueue {
+public class SubQueue implements DistributionQueue {
 
     private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
-    private static final Logger LOG = LoggerFactory.getLogger(PubErrQueue.class);
 
-    private final OffsetQueue<DistributionQueueItem> agentQueue;
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
 
-    private final  QueueEntryFactory entryFactory;
+    private final DistributionQueueItem headItem;
 
-    private final OffsetQueue<Long> errorQueue;
+    private final PackageRetries packageRetries;
 
     private final String queueName;
 
-    public PubErrQueue(String queueName, OffsetQueue<DistributionQueueItem> agentQueue, OffsetQueue<Long> errorQueue) {
-        this.queueName = requireNonNull(queueName);
-        this.agentQueue = requireNonNull(agentQueue);
-        this.errorQueue = requireNonNull(errorQueue);
-        this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0);
+	private final  QueueEntryFactory entryFactory;
+
+    public SubQueue(String queueName,
+                    @Nullable
+                    DistributionQueueItem headItem,
+                    PackageRetries packageRetries) {
+        this.headItem = headItem;
+        this.queueName = Objects.requireNonNull(queueName);
+        this.packageRetries = Objects.requireNonNull(packageRetries);
+        this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
     }
 
     @Nonnull
@@ -67,45 +77,37 @@ public class PubErrQueue implements DistributionQueue {
     }
 
     @Override
-    public DistributionQueueEntry add(@Nonnull DistributionQueueItem distributionQueueItem) {
+    public DistributionQueueEntry add(DistributionQueueItem queueItem) {
         throw new UnsupportedOperationException("Unsupported add operation");
     }
 
     @Override
+    @CheckForNull
     public DistributionQueueEntry getHead() {
-        Long refOffset = errorQueue.getHeadItem();
-        if (refOffset != null) {
-            DistributionQueueItem queueItem = agentQueue.getItem(refOffset);
-            return entryFactory.create(queueItem);
-        }
-        return null;
+        return entryFactory.create(headItem);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> getEntries(int skip, int limit) {
-        List<DistributionQueueEntry> entries = new ArrayList<>();
-        for (long refOffset : errorQueue.getHeadItems(skip, limit)) {
-            DistributionQueueItem queueItem = agentQueue.getItem(refOffset);
-            if (queueItem != null) {
-                entries.add(entryFactory.create(queueItem));
-            } else {
-                LOG.warn("queueItem at offset {} not found", refOffset);
-            }
+        final List<DistributionQueueEntry> entries;
+        if (skip == 0 && (limit == -1 || limit > 0) && headItem != null) {
+            entries = Collections.singletonList(entryFactory.create(headItem));
+        } else {
+            entries = Collections.emptyList();
         }
-        return entries;
+        return Collections.unmodifiableList(entries);
     }
 
     @Override
-    public DistributionQueueEntry getEntry(@Nonnull String entryId) {
-        DistributionQueueItem queueItem = agentQueue.getItem(EntryUtil.entryOffset(entryId));
-        return (queueItem != null)
-                ? entryFactory.create(queueItem)
+    public DistributionQueueEntry getEntry(String entryId) {
+        return (entryId.equals(EntryUtil.entryId(headItem)))
+                ? entryFactory.create(headItem)
                 : null;
     }
 
     @Override
-    public DistributionQueueEntry remove(@Nonnull String entryId) {
+    public DistributionQueueEntry remove(String entryId) {
         throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
@@ -124,13 +126,29 @@ public class PubErrQueue implements DistributionQueue {
     @Nonnull
     @Override
     public DistributionQueueStatus getStatus() {
-        return new DistributionQueueStatus(errorQueue.getSize(), DistributionQueueState.PASSIVE);
+        final DistributionQueueState queueState;
+        final int itemsCount;
+        DistributionQueueEntry headEntry = getHead();
+        if (headEntry != null) {
+            itemsCount = 1;
+            DistributionQueueItemState itemState = headEntry.getStatus().getItemState();
+            if (itemState == QUEUED) {
+                queueState = RUNNING;
+            } else {
+                queueState = BLOCKED;
+            }
+        } else {
+            itemsCount = 0;
+            queueState = IDLE;
+        }
+
+        return new DistributionQueueStatus(itemsCount, queueState);
     }
 
-    @Nonnull
     @Override
+    @Nonnull
     public DistributionQueueType getType() {
-        return DistributionQueueType.ORDERED;
+        return ORDERED;
     }
 
     @Override
@@ -138,4 +156,9 @@ public class PubErrQueue implements DistributionQueue {
         return false;
     }
 
+    private int attempts(DistributionQueueItem queueItem) {
+        String entryId = EntryUtil.entryId(queueItem);
+        return packageRetries.get(entryId);
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
index 51d5466..858e8dc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
index 80c3747..bb49514 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
similarity index 68%
rename from src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 4dd1167..f18c04c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.lang.String.format;
 
@@ -48,6 +48,8 @@ public class DistributionMetricsService {
 
     public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
 
+    public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
+    
     private final Logger log = LoggerFactory.getLogger(this.getClass());
 
     @Reference
@@ -57,12 +59,30 @@ public class DistributionMetricsService {
 
     private Timer cleanupPackageDuration;
 
+    private Histogram importedPackageSize;
+
     private Histogram exportedPackageSize;
 
     private Meter acceptedRequests;
 
     private Meter droppedRequests;
 
+    private Counter itemsBufferSize;
+
+    private Timer removedPackageDuration;
+
+    private Timer removedFailedPackageDuration;
+
+    private Timer importedPackageDuration;
+
+    private Meter failedPackageImports;
+
+    private Timer sendStoredStatusDuration;
+
+    private Timer processQueueItemDuration;
+
+    private Timer packageDistributedDuration;
+
     private Timer buildPackageDuration;
 
     private Timer enqueuePackageDuration;
@@ -82,6 +102,16 @@ public class DistributionMetricsService {
         buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
         enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
         queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
+
+        importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
+        itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
+        importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
+        removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
+        removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
+        failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
+        sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
+        processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
+        packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
     }
 
     /**
@@ -135,6 +165,15 @@ public class DistributionMetricsService {
     }
 
     /**
+     * Histogram of the imported content package size in Byte.
+     *
+     * @return a Sling Metrics histogram
+     */
+    public Histogram getImportedPackageSize() {
+        return importedPackageSize;
+    }
+
+    /**
      * Histogram of the exported content package size in Bytes.
      *
      * @return a Sling Metrics histogram
@@ -162,6 +201,79 @@ public class DistributionMetricsService {
     }
 
     /**
+     * Counter of the package buffer size on the subscriber.
+     *
+     * @return a Sling Metrics counter
+     */
+    public Counter getItemsBufferSize() {
+        return itemsBufferSize;
+    }
+
+    /**
+     * Timer capturing the duration in ms of successful packages import operations.
+     *
+     * @return a Sling Metrics timer
+     */
+    public Timer getImportedPackageDuration() {
+        return importedPackageDuration;
+    }
+
+    /**
+     * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
+     *
+     * @return a Sling Metrics timer
+     */
+    public Timer getRemovedPackageDuration() {
+        return removedPackageDuration;
+    }
+
+    /**
+     * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
+     *
+     * @return a Sling Metrics timer
+     */
+    public Timer getRemovedFailedPackageDuration() {
+        return removedFailedPackageDuration;
+    }
+
+    /**
+     * Meter of failures to import packages.
+     *
+     * @return a Sling Metrics meter
+     */
+    public Meter getFailedPackageImports() {
+        return failedPackageImports;
+    }
+
+    /**
+     * Timer capturing the duration in ms of sending a stored package status.
+     *
+     * @return a Sling Metric timer
+     */
+    public Timer getSendStoredStatusDuration() {
+        return sendStoredStatusDuration;
+    }
+
+    /**
+     * Timer capturing the duration in ms of processing a queue item.
+     *
+     * @return a Sling Metric timer
+     */
+    public Timer getProcessQueueItemDuration() {
+        return processQueueItemDuration;
+    }
+
+    /**
+     * Timer capturing the duration in ms of distributing a distribution package.
+     * The timer starts when the package is enqueued and stops when the package is successfully imported.
+     *
+     * @return a Sling Metric timer
+     */
+    public Timer getPackageDistributedDuration() {
+        return packageDistributedDuration;
+    }
+
+    /**
      * Timer capturing the duration in ms of building a content package
      *
      * @return a Sling Metric timer
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
index 48727a7..28f5aa8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
index a1e3924..e4f36c9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 6cb5170..3582b36 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.util.Objects.requireNonNull;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
index 9478372..287f3fa 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import java.util.Objects;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
index 18cf976..2bb30b5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
index 72eb177..b9d1d2d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
similarity index 99%
rename from src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
index 8ac9ba5..30f8836 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
index eb0510c..e49700e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
index c9ce5e2..1f4d44a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index beeb62d..7ecd03b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -32,7 +32,6 @@ import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 001fafc..53a7245 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
 import static java.lang.System.currentTimeMillis;
@@ -31,7 +31,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -42,13 +41,14 @@ import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
-import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics.GaugeService;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -85,11 +85,9 @@ public class BookKeeper implements Closeable {
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
-    private final SubscriberMetrics subscriberMetrics;
+    private final DistributionMetricsService distributionMetricsService;
     private final PackageHandler packageHandler;
-    public final SubscriberIdle subscriberIdle;
     private final EventAdmin eventAdmin;
-    
     private final Consumer<PackageStatusMessage> sender;
     private final boolean editable;
     private final int maxRetries;
@@ -103,10 +101,9 @@ public class BookKeeper implements Closeable {
     private GaugeService<Integer> retriesGauge;
     private int skippedCounter = 0;
 
-    BookKeeper(ResourceResolverFactory resolverFactory, 
-            SubscriberMetrics subscriberMetrics,
+    public BookKeeper(ResourceResolverFactory resolverFactory, 
+            DistributionMetricsService distributionMetricsService,
             PackageHandler packageHandler,
-            SubscriberIdle subscriberIdle,
             EventAdmin eventAdmin,
             Consumer<PackageStatusMessage> sender,
             String subAgentName,
@@ -114,12 +111,11 @@ public class BookKeeper implements Closeable {
             boolean editable, 
             int maxRetries) { 
         this.packageHandler = packageHandler;
-        this.subscriberIdle = subscriberIdle;
         this.eventAdmin = eventAdmin;
-        String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
-        this.subscriberMetrics = subscriberMetrics;
-        this.retriesGauge = subscriberMetrics.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
+        this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
         this.resolverFactory = resolverFactory;
+        this.distributionMetricsService = distributionMetricsService;
         this.sender = sender;
         this.subAgentName = subAgentName;
         this.subSlingId = subSlingId;
@@ -132,31 +128,6 @@ public class BookKeeper implements Closeable {
         this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
     }
     
-    public void processPackage(MessageInfo info, PackageMessage pkgMsg, BiPredicate<Long, PackageMessage> shouldRemove) throws Exception {
-        try {
-            sendStoredStatus();
-            long offset = info.getOffset();
-            boolean remove = shouldRemove.test(offset, pkgMsg);
-            subscriberIdle.busy();
-            if (remove) {
-                removePackage(pkgMsg, offset);
-            } else {
-                long createdTime = info.getCreateTime();
-                importPackage(pkgMsg, offset, createdTime);
-            }
-        } finally {
-            subscriberIdle.idle();
-            sendStoredStatus();
-        }
-    }
-    
-    public DistributionAgentState getState() {
-        if (subscriberIdle.isIdle()) {
-            return DistributionAgentState.IDLE;
-        }
-        return packageRetries.getSum() > 0 ? DistributionAgentState.BLOCKED : DistributionAgentState.RUNNING;
-    }
-
     /**
      * We aim at processing the packages exactly once. Processing the packages
      * exactly once is possible with the following conditions
@@ -173,11 +144,11 @@ public class BookKeeper implements Closeable {
      * failing. For those packages importers, we aim at processing packages at least
      * once, thanks to the order in which the content updates are applied.
      */
-    private void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
+    public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
         log.info("Importing distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         addPackageMDC(pkgMsg);
-        try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time();
+        try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
             packageHandler.apply(importerResolver, pkgMsg);
             if (editable) {
@@ -185,10 +156,10 @@ public class BookKeeper implements Closeable {
             }
             storeOffset(importerResolver, offset);
             importerResolver.commit();
-            subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
-            subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+            distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
+            distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
             packageRetries.clear(pkgMsg.getPubAgentName());
-            Event event = ImportedEventFactory.create(pkgMsg, subAgentName);
+            Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
             eventAdmin.postEvent(event);
         } catch (LoginException | IOException | RuntimeException e) {
             failure(pkgMsg, offset, e);
@@ -224,7 +195,7 @@ public class BookKeeper implements Closeable {
      * @throws DistributionException if the package should be retried
      */
     private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
-        subscriberMetrics.getFailedPackageImports().mark();
+        distributionMetricsService.getFailedPackageImports().mark();
 
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
@@ -239,10 +210,10 @@ public class BookKeeper implements Closeable {
         }
     }
 
-    private void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
+    public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
         log.info("Removing distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
-        Timer.Context context = subscriberMetrics.getRemovedPackageDuration().time();
+        Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             if (editable) {
                 storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
@@ -254,19 +225,17 @@ public class BookKeeper implements Closeable {
         context.stop();
     }
     
-    public void skipPackage(long offset) {
+    public void skipPackage(long offset) throws LoginException, PersistenceException {
         log.info("Skipping package at offset {}", offset);
         if (shouldCommitSkipped()) {
             try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
                 storeOffset(resolver, offset);
                 resolver.commit();
-            } catch (Exception e) {
-                log.warn("");
             }
         }
     }
 
-    private synchronized boolean shouldCommitSkipped() {
+    public synchronized boolean shouldCommitSkipped() {
         skippedCounter ++;
         if (skippedCounter > COMMIT_AFTER_NUM_SKIPPED) {
             skippedCounter = 1;
@@ -280,8 +249,8 @@ public class BookKeeper implements Closeable {
      * Send status stored in a previous run if exists
      * @throws InterruptedException
      */
-    private void sendStoredStatus() throws InterruptedException {
-        try (Timer.Context context = subscriberMetrics.getSendStoredStatusDuration().time()) {
+    public void sendStoredStatus() throws InterruptedException {
+        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
             PackageStatus status = new PackageStatus(statusStore.load());
             boolean sent = status.sent;
             int retry = 0;
@@ -317,7 +286,7 @@ public class BookKeeper implements Closeable {
         log.info("Sent status message {}",  pkgStatMsg);
     }
 
-    private void markStatusSent() {
+    public void markStatusSent() {
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             statusStore.store(resolver, "sent", true);
             resolver.commit();
@@ -334,6 +303,10 @@ public class BookKeeper implements Closeable {
         return packageRetries.get(pubAgentName);
     }
 
+    public PackageRetries getPackageRetries() {
+        return packageRetries;
+    }
+
     @Override
     public void close() throws IOException {
         IOUtils.closeQuietly(retriesGauge);
@@ -342,7 +315,7 @@ public class BookKeeper implements Closeable {
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
         log.info("Removing failed distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
-        Timer.Context context = subscriberMetrics.getRemovedFailedPackageDuration().time();
+        Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
             storeOffset(resolver, offset);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 3a1df97..f9d047b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
index c626550..998f929 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.util.Objects.requireNonNull;
 
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * Each distribution package is inspected for possible content packages in /etc/packages.
  * Such content packages are installed via the Packaging service.
  */
-class ContentPackageExtractor {
+public class ContentPackageExtractor {
     private static final String PACKAGE_BASE_PATH = "/etc/packages/";
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index a86cbc5..74fc5e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -24,14 +24,21 @@ import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
 
 import java.io.Closeable;
 import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 
 import javax.annotation.Nonnull;
@@ -39,25 +46,37 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
-import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
-import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
-import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.distribution.log.spi.DistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -65,6 +84,7 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,18 +95,28 @@ import com.google.protobuf.GeneratedMessage;
  * A Subscriber SCD agent which consumes messages produced by a
  * {@code DistributionPublisher} agent.
  */
-@Component(service = DistributionSubscriber.class, immediate = true, 
-    property = { "announceDelay=10000" }, 
-    configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+@Component(service = {}, immediate = true, property = {
+        "announceDelay=10000" }, configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
 @Designate(ocd = SubscriberConfiguration.class, factory = true)
 @ParametersAreNonnullByDefault
-public class DistributionSubscriber {
+public class DistributionSubscriber implements DistributionAgent {
     private static final int PRECONDITION_TIMEOUT = 60;
     static int RETRY_DELAY = 5000;
     static int QUEUE_FETCH_DELAY = 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
 
+    private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
+
+    @Reference(name = "packageBuilder")
+    private DistributionPackageBuilder packageBuilder;
+
+    @Reference
+    private SlingSettingsService slingSettings;
+
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+
     @Reference
     private MessagingProvider messagingProvider;
 
@@ -94,34 +124,33 @@ public class DistributionSubscriber {
     private Topics topics;
 
     @Reference
+    private EventAdmin eventAdmin;
+
+    @Reference
     private JournalAvailable journalAvailable;
 
     @Reference(name = "precondition")
     private Precondition precondition;
 
-    @Reference(name = "packageBuilder")
-    private DistributionPackageBuilder packageBuilder;
+    @Reference
+    private DistributionMetricsService distributionMetricsService;
 
     @Reference
-    private SubscriberMetrics subscriberMetrics;
+    private Packaging packaging;
     
-    @Reference
-    private SlingSettingsService slingSettings;
+    SubscriberIdle subscriberIdle;
     
-    @Reference
-    BookKeeperFactory bookKeeperFactory;
-
     private ServiceRegistration<DistributionAgent> componentReg;
 
     private Closeable packagePoller;
 
     private CommandPoller commandPoller;
 
-    BookKeeper bookKeeper;
-    
+    private BookKeeper bookKeeper;
+
     // Use a bounded internal buffer to allow reading further packages while working
     // on one at a time
-    private final BlockingQueue<FullMessage<PackageMessage>> queueItemsBuffer = new LinkedBlockingQueue<>(8);
+    private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8);
 
     private Set<String> queueNames = Collections.emptySet();
 
@@ -129,30 +158,39 @@ public class DistributionSubscriber {
 
     private String subAgentName;
 
+    private String pkgType;
+
     private volatile boolean running = true;
 
     private volatile Thread queueProcessor;
 
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
+        String subSlingId = requireNonNull(slingSettings.getSlingId());
         subAgentName = requireNonNull(config.name());
         requireNonNull(config);
         requireNonNull(context);
-
         requireNonNull(packageBuilder);
-        requireNonNull(subscriberMetrics);
         requireNonNull(slingSettings);
+        requireNonNull(resolverFactory);
         requireNonNull(messagingProvider);
         requireNonNull(topics);
+        requireNonNull(eventAdmin);
         requireNonNull(precondition);
-        requireNonNull(bookKeeperFactory);
 
+        // Unofficial config (currently just for test)
+        Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+        subscriberIdle = new SubscriberIdle(context, idleMillies);
+        
         queueNames = getNotEmpty(config.agentNames());
-        String subSlingId = requireNonNull(slingSettings.getSlingId());
         int maxRetries = config.maxRetries();
         boolean editable = config.editable();
-        PackageHandling packageHandling = config.packageHandling();
-        bookKeeper = bookKeeperFactory.create(packageBuilder, subAgentName, subSlingId, maxRetries, editable, packageHandling, sender(topics.getStatusTopic()));
+
+        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
+                sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
+        
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
 
@@ -168,11 +206,14 @@ public class DistributionSubscriber {
         announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper,
                 maxRetries, config.editable(), announceDelay);
 
+        pkgType = requireNonNull(packageBuilder.getType());
         boolean errorQueueEnabled = (maxRetries >= 0);
         String msg = format(
-                "Started Subscriber agent %s at offset %s, subscribed to agent names %s editable %s maxRetries %s errorQueueEnabled %s",
-                subAgentName, startOffset, queueNames, config.editable(), maxRetries, errorQueueEnabled);
+                "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
+                subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled);
         LOG.info(msg);
+        Dictionary<String, Object> props = createServiceProps(config);
+        componentReg = context.registerService(DistributionAgent.class, this, props);
     }
 
     private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
@@ -184,10 +225,24 @@ public class DistributionSubscriber {
         return asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(toSet());
     }
 
+    private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put("name", config.name());
+        props.put("title", config.name());
+        props.put("details", config.name());
+        props.put("agentNames", config.agentNames());
+        props.put("editable", config.editable());
+        props.put("maxRetries", config.maxRetries());
+        props.put("packageBuilder.target", config.packageBuilder_target());
+        props.put("precondition.target", config.precondition_target());
+        props.put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
+        return props;
+    }
+
     @Deactivate
     public void deactivate() {
         componentReg.unregister();
-        IOUtils.closeQuietly(announcer, bookKeeper, 
+        IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper, 
                 packagePoller, commandPoller);
         running = false;
         Thread interrupter = this.queueProcessor;
@@ -195,22 +250,71 @@ public class DistributionSubscriber {
             interrupter.interrupt();
         }
         String msg = String.format(
-                "Stopped Subscriber agent %s, subscribed to Publisher agent names %s",
-                subAgentName, queueNames);
+                "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
+                subAgentName, queueNames, pkgType);
         LOG.info(msg);
     }
-    
+
+    @Nonnull
+    @Override
+    public Iterable<String> getQueueNames() {
+        return queueNames;
+    }
+
+    @Override
+    public DistributionQueue getQueue(@Nonnull String queueName) {
+        DistributionQueueItem head = queueItemsBuffer.stream()
+                .filter(item -> isIn(queueName, item))
+                .findFirst()
+                .orElse(null);
+        return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
+    }
+
+    private boolean isIn(String queueName, DistributionQueueItem queueItem) {
+        PackageMessage packageMsg = queueItem.get(QueueItemFactory.PACKAGE_MSG, PackageMessage.class);
+        return queueName.equals(packageMsg.getPubAgentName());
+    }
+
+    @Nonnull
+    @Override
+    public DistributionLog getLog() {
+        return this::emptyDistributionLog;
+    }
+
+    private List<String> emptyDistributionLog() {
+        return Collections.emptyList();
+    }
+
     @Nonnull
+    @Override
     public DistributionAgentState getState() {
-        return bookKeeper.getState();
+        return AgentState.getState(this);
     }
-    
+
+    @Nonnull
+    @Override
+    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
+        return executeUnsupported(request);
+    }
+
+    @Nonnull
+    private DistributionResponse executeUnsupported(DistributionRequest request) {
+        String msg = format("Request type %s is not supported by this agent, expected one of %s",
+                request.getRequestType(), SUPPORTED_REQ_TYPES);
+        LOG.info(msg);
+        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
+    }
+
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
         if (shouldEnqueue(info, message)) {
-            FullMessage<PackageMessage> queueItem = new FullMessage<PackageMessage>(info, message);
+            DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
             enqueue(queueItem);
         } else {
-            bookKeeper.skipPackage(info.getOffset());
+            try {
+                bookKeeper.skipPackage(info.getOffset());
+            } catch (PersistenceException | LoginException e) {
+                LOG.info("Error marking message at offset {} as skipped", info.getOffset(), e);
+            }
         }
     }
 
@@ -219,19 +323,23 @@ public class DistributionSubscriber {
             LOG.info("Skipping package for Publisher agent {} at offset {} (not subscribed)", message.getPubAgentName(), info.getOffset());
             return false;
         }
+        if (!pkgType.equals(message.getPkgType())) {
+            LOG.warn("Skipping package with type {} at offset {}", message.getPkgType(), info.getOffset());
+            return false;
+        }
         return true;
     }
-    
+
     /**
      * We block here if the buffer is full in order to limit the number of binary
      * packages fetched in memory. Note that each queued item contains the binary
      * package to be imported.
      */
-    private void enqueue(FullMessage<PackageMessage> queueItem) {
+    private void enqueue(DistributionQueueItem queueItem) {
         try {
             while (running) {
                 if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
-                    subscriberMetrics.getItemsBufferSize().increment();
+                    distributionMetricsService.getItemsBufferSize().increment();
                     return;
                 }
             }
@@ -241,7 +349,7 @@ public class DistributionSubscriber {
             throw new RuntimeException();
         }
     }
-    
+
     private void processQueue() {
         LOG.info("Started Queue processor");
         while (!Thread.interrupted()) {
@@ -253,27 +361,37 @@ public class DistributionSubscriber {
         }
         LOG.info("Stopped Queue processor");
     }
-    
+
     private void fetchAndProcessQueueItem() throws InterruptedException {
         try {
-            FullMessage<PackageMessage> item = blockingPeekQueueItem();
-            try (Timer.Context context = subscriberMetrics.getProcessQueueItemDuration().time()) {
-                bookKeeper.processPackage(item.getInfo(), item.getMessage(), this::shouldRemove);
+            
+            bookKeeper.sendStoredStatus();
+            DistributionQueueItem item = blockingPeekQueueItem();
+
+            try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
+                processQueueItem(item);
+            } finally {
+                subscriberIdle.idle();
             }
-            queueItemsBuffer.remove();
-        } catch (PreConditionTimeoutException e) {
+
+        } catch (TimeoutException e) {
+            /**
+             * Precondition timed out. We only log this on info level as it is no error
+             */
             LOG.info(e.getMessage());
-            retryDelay();
+            Thread.sleep(RETRY_DELAY);
+        } catch (InterruptedException e) {
+            throw e;
         } catch (Exception e) {
             // Catch all to prevent processing from stopping
-            LOG.warn("Error processing queue item", e);
-            retryDelay();
+            LOG.error("Error processing queue item", e);
+            Thread.sleep(RETRY_DELAY);
         }
     }
-    
-    private FullMessage<PackageMessage> blockingPeekQueueItem() throws InterruptedException {
+
+    private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
         while (true) {
-            FullMessage<PackageMessage> queueItem = queueItemsBuffer.peek();
+            DistributionQueueItem queueItem = queueItemsBuffer.peek();
             if (queueItem != null) {
                 return queueItem;
             } else {
@@ -282,38 +400,23 @@ public class DistributionSubscriber {
         }
     }
 
-    private void retryDelay() {
-        try {
-            Thread.sleep(RETRY_DELAY);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-    
-    private boolean shouldRemove(long offset, PackageMessage message) {
-        if (commandPoller.isCleared(offset)) {
-            return true;
+    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
+        long offset = queueItem.get(RECORD_OFFSET, Long.class);
+        PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
+        boolean skip = shouldSkip(offset);
+        subscriberIdle.busy();
+        if (skip) {
+            bookKeeper.removePackage(pkgMsg, offset);
+        } else {
+            long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
+            bookKeeper.importPackage(pkgMsg, offset, createdTime);
         }
-       return waitPrecondition(offset);
+        queueItemsBuffer.remove();
+        distributionMetricsService.getItemsBufferSize().decrement();
     }
 
-    private boolean waitPrecondition(long offset) {
-        Decision decision = Decision.WAIT;
-        long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
-        while (decision == Decision.WAIT && System.currentTimeMillis() < endTime) {
-            decision = precondition.canProcess(subAgentName, offset);
-            if (decision == Decision.WAIT) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    return false;
-                }
-            } else {
-                return decision == Decision.SKIP ? true : false;
-            }
-        }
-        throw new PreConditionTimeoutException("Timeout waiting for package offset " + offset + " on status topic.");
+    private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
+        return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
index 0018641..c619d79 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.ModifiableValueMap;
@@ -44,7 +44,7 @@ import static java.util.Objects.requireNonNull;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 
 @ParametersAreNonnullByDefault
-class LocalStore {
+public class LocalStore {
 
     private static final String ROOT_PATH = "/var/sling/distribution/journal/stores";
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
similarity index 64%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
index ef832f3..a7148c4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -16,32 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
 
-import java.io.ByteArrayInputStream;
 import java.io.InputStream;
-import java.util.Objects;
-
-import javax.annotation.Nonnull;
-import javax.jcr.Binary;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class PackageHandler {
+public class PackageHandler {
     private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
     
     private DistributionPackageBuilder packageBuilder;
@@ -74,38 +66,13 @@ class PackageHandler {
         LOG.info("Importing paths {}",pkgMsg.getPathsList());
         InputStream pkgStream = null;
         try {
-            pkgStream = pkgStream(resolver, pkgMsg);
-            if (canHandlePackage(pkgMsg)) {
-                packageBuilder.installPackage(resolver, pkgStream);
-                extractor.handle(resolver, pkgMsg.getPathsList());
-            }
+            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+            packageBuilder.installPackage(resolver, pkgStream);
+            extractor.handle(resolver, pkgMsg.getPathsList());
         } finally {
             IOUtils.closeQuietly(pkgStream);
         }
-    }
 
-    private boolean canHandlePackage(PackageMessage pkgMsg) {
-        return Objects.equals(packageBuilder.getType(), pkgMsg.getPkgType());
-    }
-    
-    @Nonnull
-    public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
-        if (pkgMsg.hasPkgBinary()) {
-            return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
-        } else {
-            String pkgBinRef = pkgMsg.getPkgBinaryRef();
-            try {
-                Session session = resolver.adaptTo(Session.class);
-                if (session == null) {
-                    throw new DistributionException("Unable to get Oak session");
-                }
-                ValueFactory factory = session.getValueFactory();
-                Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
-                return binary.getStream();
-            } catch (RepositoryException e) {
-                throw new DistributionException(e.getMessage(), e);
-            }
-        }
     }
 
     private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
similarity index 92%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
index a1130fc..55dc55e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 public enum PackageHandling {
     Off, Extract, Install
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
deleted file mode 100644
index 684ab9d..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.subscriber;
-
-public class PreConditionTimeoutException extends RuntimeException {
-    public PreConditionTimeoutException(String msg) {
-        super(msg);
-    }
-
-    private static final long serialVersionUID = 6286011641627241560L;
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 625a62b..937c30a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
 import org.osgi.service.metatype.annotations.AttributeDefinition;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index e72378d..a05b80c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import java.io.Closeable;
 import java.util.Hashtable;
@@ -42,7 +42,6 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
 
     private final int idleMillis;
     private final AtomicBoolean isReady = new AtomicBoolean();
-    private final AtomicBoolean idle = new AtomicBoolean();
     private final ScheduledExecutorService executor;
     private ScheduledFuture<?> schedule;
 
@@ -70,23 +69,17 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
      * Called when processing of a message starts
      */
     public synchronized void busy() {
-        idle.set(false);
         cancelSchedule();
     }
     
     public boolean isReady() {
         return isReady.get();
     }
-    
-    public boolean isIdle() {
-        return idle.get();
-    }
 
     /**
      * Called when processing of a message has finished
      */
     public synchronized void idle() {
-        idle.set(true);
         if (!isReady.get()) {
             cancelSchedule();
             schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
deleted file mode 100644
index b0cdbcb..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Map;
-import java.util.function.Consumer;
-
-import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
-
-@Component(service = BookKeeperFactory.class, immediate = true)
-public class BookKeeperFactory {
-    
-
-
-    @Reference
-    private ResourceResolverFactory resolverFactory;
-    
-    @Reference
-    private EventAdmin eventAdmin;
-    
-    @Reference
-    private SubscriberMetrics subscriberMetrics;
-    
-    @Reference
-    private Packaging packaging;
-
-    private BundleContext context;
-
-    private Integer idleMillies;
-    
-    public BookKeeperFactory() {
-    }
-    
-    public BookKeeperFactory(ResourceResolverFactory resolverFactory,
-            EventAdmin eventAdmin, SubscriberMetrics subscriberMetrics, Packaging packaging) {
-        this.resolverFactory = resolverFactory;
-        this.eventAdmin = eventAdmin;
-        this.subscriberMetrics = subscriberMetrics;
-        this.packaging = packaging;
-    }
-
-    public void activate(BundleContext context, Map<String, Object> properties) {
-        this.context = context;
-        this.idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
-        requireNonNull(resolverFactory);
-        requireNonNull(eventAdmin);
-        requireNonNull(subscriberMetrics);
-    }
-    
-    public BookKeeper create(
-            DistributionPackageBuilder packageBuilder,
-            String subAgentName, 
-            String subSlingId, 
-            int maxRetries, 
-            boolean editable, 
-            PackageHandling packageHandling, 
-            Consumer<PackageStatusMessage> sender
-            ) {
-        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, packageHandling);
-        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
-        SubscriberIdle subscriberIdle = new SubscriberIdle(context, this.idleMillies);
-        return new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, subscriberIdle, eventAdmin,
-                sender, subAgentName, subSlingId, editable, maxRetries);
-    }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
deleted file mode 100644
index 37a4734..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND;
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME;
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_PATHS;
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_TYPE;
-import static org.apache.sling.distribution.event.DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.osgi.service.event.Event;
-
-@ParametersAreNonnullByDefault
-class ImportedEventFactory {
-
-    public static final String PACKAGE_ID = "distribution.package.id";
-    private static final String KIND_IMPORTER = "importer";
-
-    private ImportedEventFactory() {
-    }
-    
-    public static Event create(Messages.PackageMessage pkgMsg, String agentName) {
-        String[] pathsList = pkgMsg.getPathsList().toArray(new String[0]);
-        Map<String, Object> props = new HashMap<>();
-        props.put(DISTRIBUTION_COMPONENT_KIND, KIND_IMPORTER);
-        props.put(DISTRIBUTION_COMPONENT_NAME, agentName);
-        props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
-        props.put(DISTRIBUTION_PATHS, pathsList);
-        props.put(PACKAGE_ID, pkgMsg.getPkgId());
-        return new Event(IMPORTER_PACKAGE_IMPORTED, props);
-    }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
deleted file mode 100644
index 375a792..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Histogram;
-import org.apache.sling.commons.metrics.Meter;
-import org.apache.sling.commons.metrics.Timer;
-
-enum NoopMetric implements Counter, Histogram, Timer, Meter{
-    INSTANCE;
-
-    @Override
-    public long getCount() {
-        return 0;
-    }
-
-    @Override
-    public void increment() {
-
-    }
-
-    @Override
-    public void decrement() {
-
-    }
-
-    @Override
-    public void increment(long n) {
-
-    }
-
-    @Override
-    public void decrement(long n) {
-
-    }
-
-    @Override
-    public void mark() {
-
-    }
-
-    @Override
-    public void mark(long n) {
-
-    }
-
-    @Override
-    public void update(long duration, TimeUnit unit) {
-
-    }
-
-    @Override
-    public Context time() {
-        return NoopContext.INSTANCE;
-    }
-
-    @Override
-    public void update(long value) {
-
-    }
-
-    @Override
-    public <AdapterType> AdapterType adaptTo(Class<AdapterType> type) {
-        return null;
-    }
-
-    private enum NoopContext implements Context {
-        INSTANCE;
-
-        @Override
-        public long stop() {
-            return 0;
-        }
-
-        @Override
-        public void close() {
-
-        }
-    }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
deleted file mode 100644
index f5bcae0..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import static java.lang.String.format;
-
-import java.io.Closeable;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.function.Supplier;
-
-import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Gauge;
-import org.apache.sling.commons.metrics.Histogram;
-import org.apache.sling.commons.metrics.Meter;
-import org.apache.sling.commons.metrics.MetricsService;
-import org.apache.sling.commons.metrics.Timer;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(service = SubscriberMetrics.class)
-public class SubscriberMetrics {
-
-    public static final String BASE_COMPONENT = "distribution.journal";
-
-    public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
-    
-    private final Logger log = LoggerFactory.getLogger(this.getClass());
-
-    @Reference
-    private MetricsService metricsService;
-
-    private Histogram importedPackageSize;
-
-    private Counter itemsBufferSize;
-
-    private Timer removedPackageDuration;
-
-    private Timer removedFailedPackageDuration;
-
-    private Timer importedPackageDuration;
-
-    private Meter failedPackageImports;
-
-    private Timer sendStoredStatusDuration;
-
-    private Timer processQueueItemDuration;
-
-    private Timer packageDistributedDuration;
-
-    private BundleContext context;
-    
-    public SubscriberMetrics() {
-        importedPackageSize = NoopMetric.INSTANCE;
-        itemsBufferSize = NoopMetric.INSTANCE;
-        importedPackageDuration = NoopMetric.INSTANCE;
-        removedPackageDuration = NoopMetric.INSTANCE;
-        removedFailedPackageDuration = NoopMetric.INSTANCE;
-        failedPackageImports = NoopMetric.INSTANCE;
-        sendStoredStatusDuration = NoopMetric.INSTANCE;
-        processQueueItemDuration = NoopMetric.INSTANCE;
-        packageDistributedDuration = NoopMetric.INSTANCE;
-    }
-
-    @Activate
-    public void activate(BundleContext context) {
-        this.context = context;
-        importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
-        itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
-        importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
-        removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
-        removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
-        failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
-        sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
-        processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
-        packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
-    }
-
-    /**
-     * Histogram of the imported content package size in Byte.
-     *
-     * @return a Sling Metrics histogram
-     */
-    public Histogram getImportedPackageSize() {
-        return importedPackageSize;
-    }
-
-    /**
-     * Counter of the package buffer size on the subscriber.
-     *
-     * @return a Sling Metrics counter
-     */
-    public Counter getItemsBufferSize() {
-        return itemsBufferSize;
-    }
-
-    /**
-     * Timer capturing the duration in ms of successful packages import operations.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getImportedPackageDuration() {
-        return importedPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getRemovedPackageDuration() {
-        return removedPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getRemovedFailedPackageDuration() {
-        return removedFailedPackageDuration;
-    }
-
-    /**
-     * Meter of failures to import packages.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getFailedPackageImports() {
-        return failedPackageImports;
-    }
-
-    /**
-     * Timer capturing the duration in ms of sending a stored package status.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getSendStoredStatusDuration() {
-        return sendStoredStatusDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of processing a queue item.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getProcessQueueItemDuration() {
-        return processQueueItemDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of distributing a distribution package.
-     * The timer starts when the package is enqueued and stops when the package is successfully imported.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getPackageDistributedDuration() {
-        return packageDistributedDuration;
-    }
-
-    public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
-        return new GaugeService<>(name, description, supplier);
-    }
-
-    private String getMetricName(String component, String name) {
-        return format("%s.%s", component, name);
-    }
-
-    private Counter getCounter(String metricName) {
-        return metricsService.counter(metricName);
-    }
-
-    private Timer getTimer(String metricName) {
-        return metricsService.timer(metricName);
-    }
-
-    private Histogram getHistogram(String metricName) {
-        return metricsService.histogram(metricName);
-    }
-
-    private Meter getMeter(String metricName) {
-        return metricsService.meter(metricName);
-    }
-
-    public class GaugeService<T> implements Gauge<T>, Closeable {
-        
-        @SuppressWarnings("rawtypes")
-        private ServiceRegistration<Gauge> reg;
-        private final Supplier<T> supplier;
-
-        private GaugeService(String name, String description, Supplier<T> supplier) {
-            this.supplier = supplier;
-            Dictionary<String, String> props = new Hashtable<>();
-            props.put(Constants.SERVICE_DESCRIPTION, description);
-            props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
-            props.put(Gauge.NAME, name);
-            if (context != null) {
-                reg = context.registerService(Gauge.class, this, props);
-            }
-        }
-
-        @Override
-        public T getValue() {
-            return supplier.get();
-        }
-        
-        @Override
-        public void close() {
-            try {
-                reg.unregister();
-            } catch (Exception e) {
-                log.warn("Error unregistering service", e);
-            }
-        }
-    }
-    
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
index 7808b12..f279265 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
@@ -22,13 +22,12 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.junit.Test;
 
 public class DefaultPreconditionTest {
     @Test
     public void testAlwaysTrue() {
-        Decision decision = new DefaultPrecondition().canProcess("any", 100);
-        assertThat(decision, equalTo(Decision.ACCEPT));
+        boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10);
+        assertThat(canProcess, equalTo(true));
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 1484a9e..571d98a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -19,10 +19,10 @@
 package org.apache.sling.distribution.journal.impl.precondition;
 
 import org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index a5e969b..eab0479 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -19,21 +19,26 @@
 package org.apache.sling.distribution.journal.impl.precondition;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
@@ -84,28 +89,47 @@ public class StagingPreconditionTest {
         statusHandler = statusCaptor.getValue().getHandler();
     }
     
-    @Test
+    @Test(expected = IllegalArgumentException.class)
+    public void testIllegalTimeout() throws InterruptedException, TimeoutException {
+        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
+    }
+    
+    @Test(expected = TimeoutException.class)
     public void testNotYetProcessed() throws InterruptedException, TimeoutException {
         simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED);
-        Decision res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT);
-        assertThat(res, equalTo(Decision.WAIT));
-
-        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT);
-        assertThat(res2, equalTo(Decision.WAIT));
+        boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1);
+        assertThat(res, equalTo(true));
 
+        // We got no package for this agent. So this should time out
+        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
     }
     
     @Test
+    public void testDeactivateDuringCanProcess() {
+        AtomicReference<Throwable> exHolder = new AtomicReference<>();
+        Thread th = new Thread(() -> {
+            try {
+                precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2);
+            } catch (Throwable t) {
+                exHolder.set(t);
+            }
+        });
+        th.start();
+        precondition.deactivate();
+        Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
+        assertThat(ex, instanceOf(InterruptedException.class));
+    }
+    
+    @Test(expected = TimeoutException.class)
     public void testCleanup() throws InterruptedException, TimeoutException {
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
-        Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
-        assertThat(res, equalTo(Decision.ACCEPT));
+        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
         
         // Cleanup
         precondition.run();
         
-        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
-        assertThat(res2, equalTo(Decision.WAIT));
+        // Should time out because after cleanup message is not present anymore
+        precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
     }
     
     @Test
@@ -114,9 +138,9 @@ public class StagingPreconditionTest {
         simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED);
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
 
-        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000), equalTo(Decision.SKIP));
-        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001), equalTo(Decision.SKIP));
-        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002), equalTo(Decision.ACCEPT));
+        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
+        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
+        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
     }
 
     private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 2b82486..5e7b76c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -30,6 +30,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,8 +40,6 @@ import org.mockito.Mockito;
 
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index f52026c..2a41a2f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,7 +41,6 @@ import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
@@ -54,6 +53,8 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
+
 @SuppressWarnings("rawtypes")
 public class DistPublisherJMXTest {
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 434c561..7ab2bdf 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -38,9 +38,9 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import com.google.common.collect.ImmutableMap;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index ab5d17e..acc91b0 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -21,9 +21,9 @@ package org.apache.sling.distribution.journal.impl.publisher;
 import java.util.Collections;
 import java.util.HashSet;
 
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index 9b41463..a8fb467 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.UUID;
 import java.util.function.Function;
 
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -55,8 +57,6 @@ import org.mockito.Spy;
 import org.osgi.framework.BundleContext;
 
 import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index e3b5582..6aeb1ba 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -31,13 +31,13 @@ import static org.hamcrest.Matchers.hasItemInArray;
 import static org.junit.Assert.assertThat;
 
 import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.MessageInfo;
 
 public class QueueItemFactoryTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
index 5aededc..11a52f9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import java.util.HashMap;
 
-import org.apache.sling.distribution.journal.shared.EntryUtil;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.junit.Test;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
index e7f9639..18cc957 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
@@ -34,7 +34,7 @@ import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.junit.Test;
 
 public class OffsetQueueImplJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
index 4f2f0a4..6c33412 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
@@ -16,14 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
-
 public class PackageRetriesTest {
 
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 5fca8a3..1113319 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -57,10 +57,10 @@ import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 166ad1a..61c1389 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,6 +38,7 @@ import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import com.google.protobuf.GeneratedMessage;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -57,7 +58,6 @@ import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 211b9fc..20dccea 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -37,7 +37,6 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 3cef383..778bbaa 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -28,6 +28,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,7 +41,6 @@ import org.mockito.MockitoAnnotations;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
new file mode 100644
index 0000000..d2784e9
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import com.google.common.collect.Lists;
+
+public class SubQueueTest {
+
+    @Test
+    public void testGetName() throws Exception {
+        String queueName = "someQueue";
+        SubQueue queue = new SubQueue(queueName, null, new PackageRetries());
+        Assert.assertEquals(queueName, queue.getName());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAdd() throws Exception {
+        SubQueue queue = new SubQueue("someQueue", null, new PackageRetries());
+        queue.add(buildQueueItem("package-1"));
+    }
+
+    @Test
+    public void testGetHead() throws Exception {
+        SubQueue emptyQueue = new SubQueue("emptyQueue", null, new PackageRetries());
+        Assert.assertNull(emptyQueue.getHead());
+        SubQueue oneQueue = new SubQueue("oneQueue", buildQueueItem("1"), new PackageRetries());
+        Assert.assertNotNull(oneQueue.getHead());
+    }
+
+    @Test
+    public void testGetItems() throws Exception {
+        SubQueue oneQueue = new SubQueue("oneQueue", null, new PackageRetries());
+        Assert.assertNotNull(oneQueue.getEntries(0, 10));
+        SubQueue tenQueue = new SubQueue("tenQueue", buildQueueItem("1"), new PackageRetries());
+        Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, 10)).size());
+        Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, -1)).size());
+        Assert.assertEquals(0, Lists.newArrayList(tenQueue.getEntries(1, 10)).size());
+    }
+
+    private DistributionQueueItem buildQueueItem(String packageId) {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(QueueItemFactory.RECORD_TOPIC, "topic");
+        properties.put(QueueItemFactory.RECORD_OFFSET, 0);
+        properties.put(QueueItemFactory.RECORD_PARTITION, 0);
+        properties.put(QueueItemFactory.RECORD_TIMESTAMP, System.currentTimeMillis());
+        return new DistributionQueueItem(packageId, properties);
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
similarity index 85%
rename from src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
index 50d5507..1049ef5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertNotNull;
@@ -32,8 +32,7 @@ import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -95,7 +94,16 @@ public class DistributionMetricsServiceTest {
         assertNotNull(metrics.getDroppedRequests());
         assertNotNull(metrics.getEnqueuePackageDuration());
         assertNotNull(metrics.getExportedPackageSize());
+        assertNotNull(metrics.getFailedPackageImports());
+        assertNotNull(metrics.getImportedPackageDuration());
+        assertNotNull(metrics.getImportedPackageSize());
+        assertNotNull(metrics.getItemsBufferSize());
+        assertNotNull(metrics.getPackageDistributedDuration());
+        assertNotNull(metrics.getProcessQueueItemDuration());
         assertNotNull(metrics.getQueueCacheFetchCount());
+        assertNotNull(metrics.getRemovedFailedPackageDuration());
+        assertNotNull(metrics.getRemovedPackageDuration());
+        assertNotNull(metrics.getSendStoredStatusDuration());
     }
     
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
index 8fe7ad9..bd44057 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.time.Duration.of;
 import static java.time.temporal.ChronoUnit.MILLIS;
@@ -27,7 +27,6 @@ import java.time.temporal.ChronoUnit;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.sling.distribution.journal.shared.ExponentialBackOff;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.runners.MockitoJUnitRunner;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 0214b83..ce7f279 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
@@ -35,12 +35,9 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.JournalAvailableChecker;
-import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
-import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
index 7fd0207..fed9fef 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.samePropertyValuesAs;
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 
+import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,7 +42,6 @@ import org.mockito.MockitoAnnotations;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.LimitPoller;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
index ede1b69..14dcc19 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -36,7 +36,6 @@ import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.PackageBrowser;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
index f483c86..aa2947a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static java.util.Collections.emptyList;
 import static org.hamcrest.Matchers.containsString;
@@ -43,9 +43,6 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.shared.PackageViewerPlugin;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
index 7202b6b..5d31bf6 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
 import org.junit.Test;
 
 public class SimpleDistributionResponseTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
index 9145dfe..a2105e3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
 
 import org.apache.sling.distribution.journal.MessageInfo;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index e669a82..05f8a88 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,9 +27,7 @@ import static org.mockito.Mockito.when;
 import java.util.Collections;
 import java.util.function.Consumer;
 
-import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
 import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
similarity index 61%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index 2ae2803..fd4c761 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -16,32 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
-import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.impl.subscriber.SubscriberTest;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.osgi.framework.BundleContext;
 import org.osgi.service.event.EventAdmin;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -51,7 +43,8 @@ public class BookKeeperTest {
 
     private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
 
-    private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
+    @Mock
+    private DistributionMetricsService distributionMetricsService;
 
     @Mock
     private PackageHandler packageHandler;
@@ -61,18 +54,13 @@ public class BookKeeperTest {
 
     @Mock
     private Consumer<PackageStatusMessage> sender;
-    
-    @Mock
-    BundleContext context;
 
     private BookKeeper bookKeeper;
 
     @Before
     public void before() {
-        SubscriberIdle subscriberIdle = new SubscriberIdle(context, 300);
-        bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics,  packageHandler, subscriberIdle, eventAdmin, sender,
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
                 "subAgentName", "subSlingId", true, 10);
-        sem = new Semaphore(0);
     }
 
     @Test
@@ -90,29 +78,5 @@ public class BookKeeperTest {
             assertThat(bookKeeper.loadOffset(), equalTo(20l));
         }
     }
-    
-    @Test
-    public void testReadyWhenWatingForPrecondition() {
-        
-        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-        PackageMessage message = SubscriberTest.BASIC_ADD_PACKAGE;
-        Executors.newSingleThreadExecutor().execute(() -> {
-            try {
-                bookKeeper.processPackage(info, message, this::waitForSemaphore);
-            } catch (Exception e) {
-            }
-        });
-        await("Should report ready").until(bookKeeper.subscriberIdle::isReady);
-        sem.release();
-    }
-
-    public boolean waitForSemaphore(long offset, PackageMessage msg) {
-        try {
-            return sem.tryAcquire(10, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            return false;
-        }
-    }
 
-    private Semaphore sem;
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 0de3d2e..d6c774e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -33,10 +33,10 @@ import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
index 096b632..0868cc8 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.util.Collections.singletonList;
 import static org.mockito.Mockito.verify;
@@ -37,7 +37,6 @@ import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.sling.MockSling;
 import org.apache.sling.testing.mock.sling.ResourceResolverType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
index 1a03199..11b346a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.distribution.journal.service.subscriber.LocalStore;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Test;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
similarity index 94%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index 6c40034..ef5f0d7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.felix.systemready.CheckStatus.State;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.osgi.framework.BundleContext;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 4d0710c..1d47520 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,9 +21,12 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
 import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -37,39 +40,42 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Histogram;
+import org.apache.sling.commons.metrics.Meter;
+import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.SimpleDistributionRequest;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.settings.SlingSettingsService;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
@@ -77,26 +83,34 @@ import org.awaitility.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
-import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.util.converter.Converters;
 
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.ByteString;
 
 @SuppressWarnings("unchecked")
-@RunWith(MockitoJUnitRunner.class)
 public class SubscriberTest {
 
     private static final String SUB1_SLING_ID = "sub1sling";
@@ -105,7 +119,7 @@ public class SubscriberTest {
     private static final String PUB1_SLING_ID = "pub1sling";
     private static final String PUB1_AGENT_NAME = "pub1agent";
 
-    public static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
+    private static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
             .setPkgId("myid")
             .setPubSlingId(PUB1_SLING_ID)
             .setPubAgentName(PUB1_AGENT_NAME)
@@ -124,8 +138,6 @@ public class SubscriberTest {
             .addAllPaths(Arrays.asList("/test"))
             .build();
 
-    @Mock
-    Packaging packaging;
     
     @Mock
     private BundleContext context;
@@ -160,10 +172,8 @@ public class SubscriberTest {
     @Mock
     private MessageSender<PackageStatusMessage> statusSender;
 
-    @Spy
-    private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
-    
-    BookKeeperFactory bookKeeperFactory;
+    @Mock
+    private DistributionMetricsService distributionMetricsService;
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -188,14 +198,12 @@ public class SubscriberTest {
         
         Awaitility.setDefaultPollDelay(Duration.ZERO);
         Awaitility.setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
-        
-        bookKeeperFactory = new BookKeeperFactory(resolverFactory, eventAdmin, subscriberMetrics, packaging);
-        Map<String, Object> props = new HashMap<String, Object>();
-        bookKeeperFactory.activate(context, props);
-        subscriber.bookKeeperFactory = bookKeeperFactory;
+        MockitoAnnotations.initMocks(this);
         when(packageBuilder.getType()).thenReturn("journal");
         when(slingSettings.getSlingId()).thenReturn(SUB1_SLING_ID);
 
+        mockMetrics();
+
         when(clientProvider.<PackageStatusMessage>createSender()).thenReturn(statusSender, (MessageSender) discoverySender);
         when(clientProvider.createPoller(
                 Mockito.anyString(),
@@ -203,7 +211,8 @@ public class SubscriberTest {
                 Mockito.anyString(),
                 packageCaptor.capture()))
             .thenReturn(poller);
-        
+        when(context.registerService(Mockito.any(Class.class), (DistributionAgent) eq(subscriber), Mockito.any(Dictionary.class))).thenReturn(reg);
+
         // you should call initSubscriber in each test method
     }
 
@@ -218,6 +227,10 @@ public class SubscriberTest {
         assumeNoPrecondition();
         initSubscriber();
 
+        assertThat(subscriber.getQueueNames(), contains(PUB1_AGENT_NAME));
+        assertThat(subscriber.getQueue(PUB1_AGENT_NAME).getStatus().getState(), equalTo(DistributionQueueState.IDLE));
+        assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
+        
         MessageInfo info = new TestMessageInfo("", 1, 0, 0);
 
         PackageMessage message = BASIC_ADD_PACKAGE;
@@ -226,13 +239,20 @@ public class SubscriberTest {
         when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), 
                 Mockito.any(ByteArrayInputStream.class))
                 ).thenAnswer(new WaitFor(sem));
-        packageHandler.handle(info, message); 
+        packageHandler.handle(info, message);
+        
         waitSubscriber(RUNNING);
+        DistributionQueue queue = subscriber.getQueue(PUB1_AGENT_NAME);
+        DistributionQueueEntry item = queue.getHead();
+        assertThat(item.getStatus().getItemState(), equalTo(DistributionQueueItemState.QUEUED));
         
         sem.release();
         waitSubscriber(IDLE);
         verify(statusSender, times(0)).send(eq(topics.getStatusTopic()),
                 anyObject());
+        List<String> log = subscriber.getLog().getLines();
+        // We do not use the DistributionLog anymore
+        assertThat(log.size(), equalTo(0));
     }
 
 	@Test
@@ -248,10 +268,25 @@ public class SubscriberTest {
         PackageMessage message = BASIC_DEL_PACKAGE;
 
         packageHandler.handle(info, message);
-        await().until(() -> getResource("/test"), nullValue());
+        waitSubscriber(RUNNING);
+        waitSubscriber(IDLE);
+        try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
+            assertThat(resolver.getResource("/test"), nullValue());
+        }
     }
 
     @Test
+    public void testExecuteNotSupported() throws DistributionException {
+        assumeNoPrecondition();
+        initSubscriber();
+
+        DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "test");
+        DistributionResponse response = subscriber.execute(resourceResolver, request);
+        assertThat(response.getState(), equalTo(DistributionRequestState.DROPPED));
+    }
+
+
+    @Test
     public void testSendFailedStatus() throws DistributionException {
         assumeNoPrecondition();
         initSubscriber(ImmutableMap.of("maxRetries", "1"));
@@ -289,17 +324,37 @@ public class SubscriberTest {
         initSubscriber();
         MessageInfo info = new TestMessageInfo("", 1, 11, 0);
         PackageMessage message = BASIC_ADD_PACKAGE;
-        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11))).thenReturn(Decision.SKIP);
 
         packageHandler.handle(info, message);
+        waitSubscriber(RUNNING);
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false);
+
         try {
             waitSubscriber(IDLE);
             fail("Cannot be IDLE without a validation status");
         } catch (Throwable t) {
 
         }
+
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true);
+        waitSubscriber(IDLE);
+
     }
     
+    @Test
+    public void testReadyWhenWatingForPrecondition() {
+        Semaphore sem = new Semaphore(0);
+        assumeWaitingForPrecondition(sem);
+        initSubscriber();
+        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+        PackageMessage message = BASIC_ADD_PACKAGE;
+
+        packageHandler.handle(info, message);
+        waitSubscriber(RUNNING);
+        await("Should report ready").until(subscriber.subscriberIdle::isReady);
+        sem.release();
+    }
+
     private void initSubscriber() {
         initSubscriber(Collections.emptyMap());
     }
@@ -321,20 +376,51 @@ public class SubscriberTest {
         await().until(subscriber::getState, equalTo(expectedState));
     }
 
+    private void mockMetrics() {
+        Histogram histogram = Mockito.mock(Histogram.class);
+        Counter counter = Mockito.mock(Counter.class);
+        Meter meter = Mockito.mock(Meter.class);
+        Timer timer = Mockito.mock(Timer.class);
+        Timer.Context timerContext = Mockito.mock(Timer.Context.class);
+        when(timer.time())
+            .thenReturn(timerContext);
+        when(distributionMetricsService.getImportedPackageSize())
+                .thenReturn(histogram);
+        when(distributionMetricsService.getItemsBufferSize())
+                .thenReturn(counter);
+        when(distributionMetricsService.getFailedPackageImports())
+                .thenReturn(meter);
+        when(distributionMetricsService.getRemovedFailedPackageDuration())
+                .thenReturn(timer);
+        when(distributionMetricsService.getRemovedPackageDuration())
+                .thenReturn(timer);
+        when(distributionMetricsService.getImportedPackageDuration())
+                .thenReturn(timer);
+        when(distributionMetricsService.getSendStoredStatusDuration())
+                .thenReturn(timer);
+        when(distributionMetricsService.getProcessQueueItemDuration())
+                .thenReturn(timer);
+        when(distributionMetricsService.getPackageDistributedDuration())
+                .thenReturn(timer);
+    }
+
     private void assumeNoPrecondition() {
         try {
-            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.ACCEPT);
+            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    private Resource getResource(String path) throws LoginException {
-        try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
-            return resolver.getResource(path);
+    private void assumeWaitingForPrecondition(Semaphore sem) {
+        try {
+            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt()))
+                .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
-
+    
     private final class WaitFor implements Answer<DistributionPackageInfo> {
         private final Semaphore sem;