You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/03/30 16:34:11 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9259 - Extract service subscriber code into separate package (#25)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f2175a  SLING-9259 - Extract service subscriber code into separate package (#25)
9f2175a is described below

commit 9f2175a3084c66862f7172fceae483a4a159d7b2
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Mon Mar 30 18:34:04 2020 +0200

    SLING-9259 - Extract service subscriber code into separate package (#25)
    
    * SLING-9259 - Extract service subscriber code into separate package
    
    * SLING-9259 - Remove SubQueue, add factory for BookKeeper, simplify PreCondition
    
    * SLING-9259 - Provide simpler facade for BookKeeper
---
 .../impl/precondition/DefaultPrecondition.java     |   4 +-
 .../impl/precondition/PackageStatusWatcher.java    |   2 +-
 .../journal/impl/precondition/Precondition.java    |   3 +-
 .../impl/precondition/StagingPrecondition.java     |  32 +--
 .../journal/impl/publisher/DiscoveryService.java   |   2 +-
 .../impl/publisher/DistributionPublisher.java      |  15 +-
 .../impl/publisher/PackageDistributedNotifier.java |   2 +-
 .../journal/impl/publisher/PackageRepo.java        |   4 +-
 .../journal/impl/queue/impl/PubErrQueue.java       |   2 +
 .../journal/impl/queue/impl/PubQueue.java          |   2 +
 .../journal/impl/queue/impl/PubQueueCache.java     |   8 +-
 .../impl/queue/impl/PubQueueCacheService.java      |   4 +-
 .../impl/queue/impl/PubQueueProviderImpl.java      |   2 +-
 .../journal/impl/queue/impl/SubQueue.java          | 164 -------------
 .../journal/impl/subscriber/Announcer.java         |   1 +
 .../journal/impl/subscriber/CommandPoller.java     |   2 +-
 .../impl/subscriber/DistributionSubscriber.java    | 269 +++++++--------------
 ...ling.java => PreConditionTimeoutException.java} |   9 +-
 .../impl/subscriber/SubscriberConfiguration.java   |   1 +
 .../{impl => service}/subscriber/BookKeeper.java   |  85 ++++---
 .../service/subscriber/BookKeeperFactory.java      |  90 +++++++
 .../subscriber/ContentPackageExtractor.java        |   4 +-
 .../service/subscriber/ImportedEventFactory.java   |  55 +++++
 .../{impl => service}/subscriber/LocalStore.java   |   4 +-
 .../journal/service/subscriber/NoopMetric.java     |  99 ++++++++
 .../subscriber/PackageHandler.java                 |  45 +++-
 .../subscriber/PackageHandling.java                |   2 +-
 .../subscriber}/PackageRetries.java                |   4 +-
 .../subscriber/SubscriberIdle.java                 |   9 +-
 .../subscriber/SubscriberMetrics.java}             | 156 ++----------
 .../journal/{impl => }/shared/AgentState.java      |   2 +-
 .../{impl => }/shared/DefaultDistributionLog.java  |   2 +-
 .../shared/DistributionMetricsService.java         | 114 +--------
 .../{impl/queue/impl => shared}/EntryUtil.java     |   2 +-
 .../{impl => }/shared/ExponentialBackOff.java      |   2 +-
 .../journal/{impl => }/shared/JMXRegistration.java |   2 +-
 .../{impl => }/shared/JournalAvailableChecker.java |   4 +-
 .../shared/JournalAvailableServiceMarker.java      |   2 +-
 .../journal/{impl => }/shared/LimitPoller.java     |   2 +-
 .../journal/{impl => }/shared/PackageBrowser.java  |   2 +-
 .../{impl => }/shared/PackageViewerPlugin.java     |   2 +-
 .../queue/impl => shared}/QueueEntryFactory.java   |   2 +-
 .../shared/SimpleDistributionResponse.java         |   2 +-
 .../journal/{impl => }/shared/Topics.java          |   2 +-
 .../impl/precondition/DefaultPreconditionTest.java |   5 +-
 .../precondition/PackageStatusWatcherTest.java     |   4 +-
 .../impl/precondition/StagingPreconditionTest.java |  56 ++---
 .../impl/publisher/DiscoveryServiceTest.java       |   4 +-
 .../impl/publisher/DistPublisherJMXTest.java       |   3 +-
 .../impl/publisher/DistributionPublisherTest.java  |   4 +-
 .../publisher/PackageDistributedNotifierTest.java  |   2 +-
 .../journal/impl/publisher/PackageRepoTest.java    |   4 +-
 .../journal/impl/queue/QueueItemFactoryTest.java   |   2 +-
 .../journal/impl/queue/impl/EntryUtilTest.java     |   1 +
 .../impl/queue/impl/OffsetQueueImplJMXTest.java    |   2 +-
 .../journal/impl/queue/impl/PubQueueCacheTest.java |   4 +-
 .../impl/queue/impl/PubQueueProviderTest.java      |   2 +-
 .../journal/impl/queue/impl/PubQueueTest.java      |   1 +
 .../journal/impl/queue/impl/RangePollerTest.java   |   2 +-
 .../journal/impl/queue/impl/SubQueueTest.java      |  72 ------
 .../journal/impl/subscriber/AnnouncerTest.java     |   2 +
 .../journal/impl/subscriber/CommandPollerTest.java |   4 +-
 .../journal/impl/subscriber/SubscriberTest.java    | 170 ++++---------
 .../subscriber/BookKeeperTest.java                 |  46 +++-
 .../subscriber/ContentPackageExtractorTest.java    |   3 +-
 .../subscriber/LocalStoreTest.java                 |   3 +-
 .../subscriber}/PackageRetriesTest.java            |   4 +-
 .../subscriber/SubscriberIdleTest.java             |   3 +-
 .../shared/DistributionMetricsServiceTest.java     |  14 +-
 .../{impl => }/shared/ExponentialBackoffTest.java  |   3 +-
 .../shared/JournalAvailableCheckerTest.java        |  11 +-
 .../journal/{impl => }/shared/LimitPollerTest.java |   4 +-
 .../{impl => }/shared/PackageBrowserTest.java      |   3 +-
 .../{impl => }/shared/PackageViewerPluginTest.java |   5 +-
 .../shared/SimpleDistributionResponseTest.java     |   3 +-
 .../journal/{impl => }/shared/TestMessageInfo.java |   2 +-
 76 files changed, 671 insertions(+), 1000 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
index dceed02..5144378 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
 @Component(immediate = true, service = Precondition.class, property = { "name=default" })
 public class DefaultPrecondition implements Precondition {
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) {
-        return true;
+    public Decision canProcess(String subAgentName, long pkgOffset) {
+        return Decision.ACCEPT;
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0408e0d..506cf55 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
 
 public class PackageStatusWatcher implements Closeable {
     private final Closeable poller;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 3730475..d934431 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -33,6 +33,7 @@ public interface Precondition {
      * @throws InterruptedException if the thread was interrupted and should shut down
      * @return true if the package can be processed; otherwise it returns false.
      */
-    boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
+    Decision canProcess(String subAgentName, long pkgOffset);
 
+    enum Decision { ACCEPT, SKIP, WAIT};
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..ad6be9c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -21,12 +21,10 @@ package org.apache.sling.distribution.journal.impl.precondition;
 import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
 import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
 
-import java.util.concurrent.TimeoutException;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -56,8 +54,6 @@ public class StagingPrecondition implements Precondition, Runnable {
 
     private volatile PackageStatusWatcher watcher;
 
-    private volatile boolean running = true;
-    
     @Activate
     public void activate() {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
@@ -67,31 +63,15 @@ public class StagingPrecondition implements Precondition, Runnable {
     @Deactivate
     public synchronized void deactivate() {
         IOUtils.closeQuietly(watcher);
-        running = false;
     }
 
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
-        if (timeoutSeconds < 1) {
-            throw new IllegalArgumentException();
-        }
-
-        // try to get the status for timeoutSeconds and then throw
-        for(int i=0; i < timeoutSeconds * 10; i++) {
-            Status status = getStatus(subAgentName, pkgOffset);
-
-            if (status != null) {
-                return status == Status.IMPORTED;
-            } else {
-                Thread.sleep(100);
-            }
-            
-            if (!running) {
-                throw new InterruptedException("Staging precondition is shutting down");
-            }
+    public Decision canProcess(String subAgentName, long pkgOffset) {
+        Status status = getStatus(subAgentName, pkgOffset);
+        if (status == null) {
+            return Decision.WAIT;
         }
-
-        throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
+        return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP;
     }
 
     private synchronized Status getStatus(String subAgentName, long pkgOffset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index af052a3..c201a42 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -28,9 +28,9 @@ import java.util.Hashtable;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.commons.io.IOUtils;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 55787dc..c1c75d0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -20,12 +20,12 @@ package org.apache.sling.distribution.journal.impl.publisher;
 
 
 import static java.util.stream.StreamSupport.stream;
-import static org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.timed;
 import static java.util.Objects.requireNonNull;
 import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
 import static org.apache.sling.distribution.DistributionRequestType.ADD;
 import static org.apache.sling.distribution.DistributionRequestType.DELETE;
 import static org.apache.sling.distribution.DistributionRequestType.TEST;
+import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,11 +47,6 @@ import javax.management.NotCompliantMBeanException;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
@@ -73,9 +68,13 @@ import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
-
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.AgentState;
+import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index 591a0cf..e3dace3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -29,8 +29,8 @@ import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.JsonMessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index aee0c52..81a4fba 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -25,8 +25,6 @@ import javax.jcr.Node;
 import javax.jcr.Property;
 import javax.jcr.nodetype.NodeType;
 
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.jackrabbit.api.ReferenceBinary;
 import org.apache.jackrabbit.commons.JcrUtils;
 import org.apache.sling.api.resource.LoginException;
@@ -47,6 +45,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index aaa7b1c..db27714 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -26,6 +26,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 9bfd4fd..197d0b1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -40,6 +40,8 @@ import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 189513c..b794116 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -41,8 +41,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
@@ -53,6 +51,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
@@ -176,7 +176,9 @@ public class PubQueueCache {
                 sender.send(topic, pkgMsg);
                 sleep(seedingDelayMs);
             } catch (MessagingException e) {
-                LOG.warn(e.getMessage(), e);
+                if (!(e.getCause() instanceof InterruptedException)) {
+                    LOG.warn(e.getMessage(), e);
+                }
                 sleep(seedingDelayMs * 10);
             }
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index c5eeb8c..5709493 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -22,9 +22,9 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index cc9e839..a72a358 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -30,11 +30,11 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
deleted file mode 100644
index 5d34e6d..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.DistributionQueueStatus;
-import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.BLOCKED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.IDLE;
-import static org.apache.sling.distribution.queue.DistributionQueueState.RUNNING;
-import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
-
-@ParametersAreNonnullByDefault
-public class SubQueue implements DistributionQueue {
-
-    private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
-
-    @SuppressWarnings("unused")
-    private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
-
-    private final DistributionQueueItem headItem;
-
-    private final PackageRetries packageRetries;
-
-    private final String queueName;
-
-	private final  QueueEntryFactory entryFactory;
-
-    public SubQueue(String queueName,
-                    @Nullable
-                    DistributionQueueItem headItem,
-                    PackageRetries packageRetries) {
-        this.headItem = headItem;
-        this.queueName = Objects.requireNonNull(queueName);
-        this.packageRetries = Objects.requireNonNull(packageRetries);
-        this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
-    }
-
-    @Nonnull
-    @Override
-    public String getName() {
-        return queueName;
-    }
-
-    @Override
-    public DistributionQueueEntry add(DistributionQueueItem queueItem) {
-        throw new UnsupportedOperationException("Unsupported add operation");
-    }
-
-    @Override
-    @CheckForNull
-    public DistributionQueueEntry getHead() {
-        return entryFactory.create(headItem);
-    }
-
-    @Nonnull
-    @Override
-    public Iterable<DistributionQueueEntry> getEntries(int skip, int limit) {
-        final List<DistributionQueueEntry> entries;
-        if (skip == 0 && (limit == -1 || limit > 0) && headItem != null) {
-            entries = Collections.singletonList(entryFactory.create(headItem));
-        } else {
-            entries = Collections.emptyList();
-        }
-        return Collections.unmodifiableList(entries);
-    }
-
-    @Override
-    public DistributionQueueEntry getEntry(String entryId) {
-        return (entryId.equals(EntryUtil.entryId(headItem)))
-                ? entryFactory.create(headItem)
-                : null;
-    }
-
-    @Override
-    public DistributionQueueEntry remove(String entryId) {
-        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
-    }
-
-    @Nonnull
-    @Override
-    public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
-        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
-    }
-
-    @Nonnull
-    @Override
-    public Iterable<DistributionQueueEntry> clear(int limit) {
-        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
-    }
-
-    @Nonnull
-    @Override
-    public DistributionQueueStatus getStatus() {
-        final DistributionQueueState queueState;
-        final int itemsCount;
-        DistributionQueueEntry headEntry = getHead();
-        if (headEntry != null) {
-            itemsCount = 1;
-            DistributionQueueItemState itemState = headEntry.getStatus().getItemState();
-            if (itemState == QUEUED) {
-                queueState = RUNNING;
-            } else {
-                queueState = BLOCKED;
-            }
-        } else {
-            itemsCount = 0;
-            queueState = IDLE;
-        }
-
-        return new DistributionQueueStatus(itemsCount, queueState);
-    }
-
-    @Override
-    @Nonnull
-    public DistributionQueueType getType() {
-        return ORDERED;
-    }
-
-    @Override
-    public boolean hasCapability(String capability) {
-        return false;
-    }
-
-    private int attempts(DistributionQueueItem queueItem) {
-        String entryId = EntryUtil.entryId(queueItem);
-        return packageRetries.get(entryId);
-    }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index 7ecd03b..beeb62d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -32,6 +32,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index f9d047b..3a1df97 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..a86cbc5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -24,21 +24,14 @@ import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
 
 import java.io.Closeable;
 import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 
 import javax.annotation.Nonnull;
@@ -46,37 +39,25 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
-import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.log.spi.DistributionLog;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -84,7 +65,6 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,28 +75,18 @@ import com.google.protobuf.GeneratedMessage;
  * A Subscriber SCD agent which consumes messages produced by a
  * {@code DistributionPublisher} agent.
  */
-@Component(service = {}, immediate = true, property = {
-        "announceDelay=10000" }, configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+@Component(service = DistributionSubscriber.class, immediate = true, 
+    property = { "announceDelay=10000" }, 
+    configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
 @Designate(ocd = SubscriberConfiguration.class, factory = true)
 @ParametersAreNonnullByDefault
-public class DistributionSubscriber implements DistributionAgent {
+public class DistributionSubscriber {
     private static final int PRECONDITION_TIMEOUT = 60;
     static int RETRY_DELAY = 5000;
     static int QUEUE_FETCH_DELAY = 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
 
-    private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
-
-    @Reference(name = "packageBuilder")
-    private DistributionPackageBuilder packageBuilder;
-
-    @Reference
-    private SlingSettingsService slingSettings;
-
-    @Reference
-    private ResourceResolverFactory resolverFactory;
-
     @Reference
     private MessagingProvider messagingProvider;
 
@@ -124,33 +94,34 @@ public class DistributionSubscriber implements DistributionAgent {
     private Topics topics;
 
     @Reference
-    private EventAdmin eventAdmin;
-
-    @Reference
     private JournalAvailable journalAvailable;
 
     @Reference(name = "precondition")
     private Precondition precondition;
 
-    @Reference
-    private DistributionMetricsService distributionMetricsService;
+    @Reference(name = "packageBuilder")
+    private DistributionPackageBuilder packageBuilder;
 
     @Reference
-    private Packaging packaging;
+    private SubscriberMetrics subscriberMetrics;
     
-    SubscriberIdle subscriberIdle;
+    @Reference
+    private SlingSettingsService slingSettings;
     
+    @Reference
+    BookKeeperFactory bookKeeperFactory;
+
     private ServiceRegistration<DistributionAgent> componentReg;
 
     private Closeable packagePoller;
 
     private CommandPoller commandPoller;
 
-    private BookKeeper bookKeeper;
-
+    BookKeeper bookKeeper;
+    
     // Use a bounded internal buffer to allow reading further packages while working
     // on one at a time
-    private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8);
+    private final BlockingQueue<FullMessage<PackageMessage>> queueItemsBuffer = new LinkedBlockingQueue<>(8);
 
     private Set<String> queueNames = Collections.emptySet();
 
@@ -158,39 +129,30 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private String subAgentName;
 
-    private String pkgType;
-
     private volatile boolean running = true;
 
     private volatile Thread queueProcessor;
 
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
-        String subSlingId = requireNonNull(slingSettings.getSlingId());
         subAgentName = requireNonNull(config.name());
         requireNonNull(config);
         requireNonNull(context);
+
         requireNonNull(packageBuilder);
+        requireNonNull(subscriberMetrics);
         requireNonNull(slingSettings);
-        requireNonNull(resolverFactory);
         requireNonNull(messagingProvider);
         requireNonNull(topics);
-        requireNonNull(eventAdmin);
         requireNonNull(precondition);
+        requireNonNull(bookKeeperFactory);
 
-        // Unofficial config (currently just for test)
-        Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
-        subscriberIdle = new SubscriberIdle(context, idleMillies);
-        
         queueNames = getNotEmpty(config.agentNames());
+        String subSlingId = requireNonNull(slingSettings.getSlingId());
         int maxRetries = config.maxRetries();
         boolean editable = config.editable();
-
-        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
-        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
-                sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
-        
+        PackageHandling packageHandling = config.packageHandling();
+        bookKeeper = bookKeeperFactory.create(packageBuilder, subAgentName, subSlingId, maxRetries, editable, packageHandling, sender(topics.getStatusTopic()));
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
 
@@ -206,14 +168,11 @@ public class DistributionSubscriber implements DistributionAgent {
         announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper,
                 maxRetries, config.editable(), announceDelay);
 
-        pkgType = requireNonNull(packageBuilder.getType());
         boolean errorQueueEnabled = (maxRetries >= 0);
         String msg = format(
-                "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
-                subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled);
+                "Started Subscriber agent %s at offset %s, subscribed to agent names %s editable %s maxRetries %s errorQueueEnabled %s",
+                subAgentName, startOffset, queueNames, config.editable(), maxRetries, errorQueueEnabled);
         LOG.info(msg);
-        Dictionary<String, Object> props = createServiceProps(config);
-        componentReg = context.registerService(DistributionAgent.class, this, props);
     }
 
     private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
@@ -225,24 +184,10 @@ public class DistributionSubscriber implements DistributionAgent {
         return asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(toSet());
     }
 
-    private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
-        Dictionary<String, Object> props = new Hashtable<>();
-        props.put("name", config.name());
-        props.put("title", config.name());
-        props.put("details", config.name());
-        props.put("agentNames", config.agentNames());
-        props.put("editable", config.editable());
-        props.put("maxRetries", config.maxRetries());
-        props.put("packageBuilder.target", config.packageBuilder_target());
-        props.put("precondition.target", config.precondition_target());
-        props.put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
-        return props;
-    }
-
     @Deactivate
     public void deactivate() {
         componentReg.unregister();
-        IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper, 
+        IOUtils.closeQuietly(announcer, bookKeeper, 
                 packagePoller, commandPoller);
         running = false;
         Thread interrupter = this.queueProcessor;
@@ -250,71 +195,22 @@ public class DistributionSubscriber implements DistributionAgent {
             interrupter.interrupt();
         }
         String msg = String.format(
-                "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
-                subAgentName, queueNames, pkgType);
+                "Stopped Subscriber agent %s, subscribed to Publisher agent names %s",
+                subAgentName, queueNames);
         LOG.info(msg);
     }
-
-    @Nonnull
-    @Override
-    public Iterable<String> getQueueNames() {
-        return queueNames;
-    }
-
-    @Override
-    public DistributionQueue getQueue(@Nonnull String queueName) {
-        DistributionQueueItem head = queueItemsBuffer.stream()
-                .filter(item -> isIn(queueName, item))
-                .findFirst()
-                .orElse(null);
-        return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
-    }
-
-    private boolean isIn(String queueName, DistributionQueueItem queueItem) {
-        PackageMessage packageMsg = queueItem.get(QueueItemFactory.PACKAGE_MSG, PackageMessage.class);
-        return queueName.equals(packageMsg.getPubAgentName());
-    }
-
-    @Nonnull
-    @Override
-    public DistributionLog getLog() {
-        return this::emptyDistributionLog;
-    }
-
-    private List<String> emptyDistributionLog() {
-        return Collections.emptyList();
-    }
-
+    
     @Nonnull
-    @Override
     public DistributionAgentState getState() {
-        return AgentState.getState(this);
-    }
-
-    @Nonnull
-    @Override
-    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
-        return executeUnsupported(request);
+        return bookKeeper.getState();
     }
-
-    @Nonnull
-    private DistributionResponse executeUnsupported(DistributionRequest request) {
-        String msg = format("Request type %s is not supported by this agent, expected one of %s",
-                request.getRequestType(), SUPPORTED_REQ_TYPES);
-        LOG.info(msg);
-        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
-    }
-
+    
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
         if (shouldEnqueue(info, message)) {
-            DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
+            FullMessage<PackageMessage> queueItem = new FullMessage<PackageMessage>(info, message);
             enqueue(queueItem);
         } else {
-            try {
-                bookKeeper.skipPackage(info.getOffset());
-            } catch (PersistenceException | LoginException e) {
-                LOG.info("Error marking message at offset {} as skipped", info.getOffset(), e);
-            }
+            bookKeeper.skipPackage(info.getOffset());
         }
     }
 
@@ -323,23 +219,19 @@ public class DistributionSubscriber implements DistributionAgent {
             LOG.info("Skipping package for Publisher agent {} at offset {} (not subscribed)", message.getPubAgentName(), info.getOffset());
             return false;
         }
-        if (!pkgType.equals(message.getPkgType())) {
-            LOG.warn("Skipping package with type {} at offset {}", message.getPkgType(), info.getOffset());
-            return false;
-        }
         return true;
     }
-
+    
     /**
      * We block here if the buffer is full in order to limit the number of binary
      * packages fetched in memory. Note that each queued item contains the binary
      * package to be imported.
      */
-    private void enqueue(DistributionQueueItem queueItem) {
+    private void enqueue(FullMessage<PackageMessage> queueItem) {
         try {
             while (running) {
                 if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
-                    distributionMetricsService.getItemsBufferSize().increment();
+                    subscriberMetrics.getItemsBufferSize().increment();
                     return;
                 }
             }
@@ -349,7 +241,7 @@ public class DistributionSubscriber implements DistributionAgent {
             throw new RuntimeException();
         }
     }
-
+    
     private void processQueue() {
         LOG.info("Started Queue processor");
         while (!Thread.interrupted()) {
@@ -361,37 +253,27 @@ public class DistributionSubscriber implements DistributionAgent {
         }
         LOG.info("Stopped Queue processor");
     }
-
+    
     private void fetchAndProcessQueueItem() throws InterruptedException {
         try {
-            
-            bookKeeper.sendStoredStatus();
-            DistributionQueueItem item = blockingPeekQueueItem();
-
-            try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
-                processQueueItem(item);
-            } finally {
-                subscriberIdle.idle();
+            FullMessage<PackageMessage> item = blockingPeekQueueItem();
+            try (Timer.Context context = subscriberMetrics.getProcessQueueItemDuration().time()) {
+                bookKeeper.processPackage(item.getInfo(), item.getMessage(), this::shouldRemove);
             }
-
-        } catch (TimeoutException e) {
-            /**
-             * Precondition timed out. We only log this on info level as it is no error
-             */
+            queueItemsBuffer.remove();
+        } catch (PreConditionTimeoutException e) {
             LOG.info(e.getMessage());
-            Thread.sleep(RETRY_DELAY);
-        } catch (InterruptedException e) {
-            throw e;
+            retryDelay();
         } catch (Exception e) {
             // Catch all to prevent processing from stopping
-            LOG.error("Error processing queue item", e);
-            Thread.sleep(RETRY_DELAY);
+            LOG.warn("Error processing queue item", e);
+            retryDelay();
         }
     }
-
-    private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
+    
+    private FullMessage<PackageMessage> blockingPeekQueueItem() throws InterruptedException {
         while (true) {
-            DistributionQueueItem queueItem = queueItemsBuffer.peek();
+            FullMessage<PackageMessage> queueItem = queueItemsBuffer.peek();
             if (queueItem != null) {
                 return queueItem;
             } else {
@@ -400,23 +282,38 @@ public class DistributionSubscriber implements DistributionAgent {
         }
     }
 
-    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
-        long offset = queueItem.get(RECORD_OFFSET, Long.class);
-        PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
-        boolean skip = shouldSkip(offset);
-        subscriberIdle.busy();
-        if (skip) {
-            bookKeeper.removePackage(pkgMsg, offset);
-        } else {
-            long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
-            bookKeeper.importPackage(pkgMsg, offset, createdTime);
+    private void retryDelay() {
+        try {
+            Thread.sleep(RETRY_DELAY);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+    
+    private boolean shouldRemove(long offset, PackageMessage message) {
+        if (commandPoller.isCleared(offset)) {
+            return true;
         }
-        queueItemsBuffer.remove();
-        distributionMetricsService.getItemsBufferSize().decrement();
+       return waitPrecondition(offset);
     }
 
-    private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
-        return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
+    private boolean waitPrecondition(long offset) {
+        Decision decision = Decision.WAIT;
+        long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
+        while (decision == Decision.WAIT && System.currentTimeMillis() < endTime) {
+            decision = precondition.canProcess(subAgentName, offset);
+            if (decision == Decision.WAIT) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+            } else {
+                return decision == Decision.SKIP ? true : false;
+            }
+        }
+        throw new PreConditionTimeoutException("Timeout waiting for package offset " + offset + " on status topic.");
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
similarity index 79%
copy from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
copy to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
index 55dc55e..684ab9d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
@@ -18,6 +18,11 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-public enum PackageHandling {
-    Off, Extract, Install
+public class PreConditionTimeoutException extends RuntimeException {
+    public PreConditionTimeoutException(String msg) {
+        super(msg);
+    }
+
+    private static final long serialVersionUID = 6286011641627241560L;
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 937c30a..625a62b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
 import org.osgi.service.metatype.annotations.AttributeDefinition;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
index 53a7245..001fafc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.lang.String.format;
 import static java.lang.System.currentTimeMillis;
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -41,14 +42,13 @@ import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics.GaugeService;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -85,9 +85,11 @@ public class BookKeeper implements Closeable {
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
-    private final DistributionMetricsService distributionMetricsService;
+    private final SubscriberMetrics subscriberMetrics;
     private final PackageHandler packageHandler;
+    public final SubscriberIdle subscriberIdle;
     private final EventAdmin eventAdmin;
+    
     private final Consumer<PackageStatusMessage> sender;
     private final boolean editable;
     private final int maxRetries;
@@ -101,9 +103,10 @@ public class BookKeeper implements Closeable {
     private GaugeService<Integer> retriesGauge;
     private int skippedCounter = 0;
 
-    public BookKeeper(ResourceResolverFactory resolverFactory, 
-            DistributionMetricsService distributionMetricsService,
+    BookKeeper(ResourceResolverFactory resolverFactory, 
+            SubscriberMetrics subscriberMetrics,
             PackageHandler packageHandler,
+            SubscriberIdle subscriberIdle,
             EventAdmin eventAdmin,
             Consumer<PackageStatusMessage> sender,
             String subAgentName,
@@ -111,11 +114,12 @@ public class BookKeeper implements Closeable {
             boolean editable, 
             int maxRetries) { 
         this.packageHandler = packageHandler;
+        this.subscriberIdle = subscriberIdle;
         this.eventAdmin = eventAdmin;
-        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
-        this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+        String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
+        this.subscriberMetrics = subscriberMetrics;
+        this.retriesGauge = subscriberMetrics.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
         this.resolverFactory = resolverFactory;
-        this.distributionMetricsService = distributionMetricsService;
         this.sender = sender;
         this.subAgentName = subAgentName;
         this.subSlingId = subSlingId;
@@ -128,6 +132,31 @@ public class BookKeeper implements Closeable {
         this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
     }
     
+    public void processPackage(MessageInfo info, PackageMessage pkgMsg, BiPredicate<Long, PackageMessage> shouldRemove) throws Exception {
+        try {
+            sendStoredStatus();
+            long offset = info.getOffset();
+            boolean remove = shouldRemove.test(offset, pkgMsg);
+            subscriberIdle.busy();
+            if (remove) {
+                removePackage(pkgMsg, offset);
+            } else {
+                long createdTime = info.getCreateTime();
+                importPackage(pkgMsg, offset, createdTime);
+            }
+        } finally {
+            subscriberIdle.idle();
+            sendStoredStatus();
+        }
+    }
+    
+    public DistributionAgentState getState() {
+        if (subscriberIdle.isIdle()) {
+            return DistributionAgentState.IDLE;
+        }
+        return packageRetries.getSum() > 0 ? DistributionAgentState.BLOCKED : DistributionAgentState.RUNNING;
+    }
+
     /**
      * We aim at processing the packages exactly once. Processing the packages
      * exactly once is possible with the following conditions
@@ -144,11 +173,11 @@ public class BookKeeper implements Closeable {
      * failing. For those packages importers, we aim at processing packages at least
      * once, thanks to the order in which the content updates are applied.
      */
-    public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
+    private void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
         log.info("Importing distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         addPackageMDC(pkgMsg);
-        try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
+        try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
             packageHandler.apply(importerResolver, pkgMsg);
             if (editable) {
@@ -156,10 +185,10 @@ public class BookKeeper implements Closeable {
             }
             storeOffset(importerResolver, offset);
             importerResolver.commit();
-            distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
-            distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+            subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
+            subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
             packageRetries.clear(pkgMsg.getPubAgentName());
-            Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
+            Event event = ImportedEventFactory.create(pkgMsg, subAgentName);
             eventAdmin.postEvent(event);
         } catch (LoginException | IOException | RuntimeException e) {
             failure(pkgMsg, offset, e);
@@ -195,7 +224,7 @@ public class BookKeeper implements Closeable {
      * @throws DistributionException if the package should be retried
      */
     private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
-        distributionMetricsService.getFailedPackageImports().mark();
+        subscriberMetrics.getFailedPackageImports().mark();
 
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
@@ -210,10 +239,10 @@ public class BookKeeper implements Closeable {
         }
     }
 
-    public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
+    private void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
         log.info("Removing distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
-        Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
+        Timer.Context context = subscriberMetrics.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             if (editable) {
                 storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
@@ -225,17 +254,19 @@ public class BookKeeper implements Closeable {
         context.stop();
     }
     
-    public void skipPackage(long offset) throws LoginException, PersistenceException {
+    public void skipPackage(long offset) {
         log.info("Skipping package at offset {}", offset);
         if (shouldCommitSkipped()) {
             try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
                 storeOffset(resolver, offset);
                 resolver.commit();
+            } catch (Exception e) {
+                log.warn("");
             }
         }
     }
 
-    public synchronized boolean shouldCommitSkipped() {
+    private synchronized boolean shouldCommitSkipped() {
         skippedCounter ++;
         if (skippedCounter > COMMIT_AFTER_NUM_SKIPPED) {
             skippedCounter = 1;
@@ -249,8 +280,8 @@ public class BookKeeper implements Closeable {
      * Send status stored in a previous run if exists
      * @throws InterruptedException
      */
-    public void sendStoredStatus() throws InterruptedException {
-        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+    private void sendStoredStatus() throws InterruptedException {
+        try (Timer.Context context = subscriberMetrics.getSendStoredStatusDuration().time()) {
             PackageStatus status = new PackageStatus(statusStore.load());
             boolean sent = status.sent;
             int retry = 0;
@@ -286,7 +317,7 @@ public class BookKeeper implements Closeable {
         log.info("Sent status message {}",  pkgStatMsg);
     }
 
-    public void markStatusSent() {
+    private void markStatusSent() {
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             statusStore.store(resolver, "sent", true);
             resolver.commit();
@@ -303,10 +334,6 @@ public class BookKeeper implements Closeable {
         return packageRetries.get(pubAgentName);
     }
 
-    public PackageRetries getPackageRetries() {
-        return packageRetries;
-    }
-
     @Override
     public void close() throws IOException {
         IOUtils.closeQuietly(retriesGauge);
@@ -315,7 +342,7 @@ public class BookKeeper implements Closeable {
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
         log.info("Removing failed distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
-        Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
+        Timer.Context context = subscriberMetrics.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
             storeOffset(resolver, offset);
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
new file mode 100644
index 0000000..b0cdbcb
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+
+@Component(service = BookKeeperFactory.class, immediate = true)
+public class BookKeeperFactory {
+    
+
+
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+    
+    @Reference
+    private EventAdmin eventAdmin;
+    
+    @Reference
+    private SubscriberMetrics subscriberMetrics;
+    
+    @Reference
+    private Packaging packaging;
+
+    private BundleContext context;
+
+    private Integer idleMillies;
+    
+    public BookKeeperFactory() {
+    }
+    
+    public BookKeeperFactory(ResourceResolverFactory resolverFactory,
+            EventAdmin eventAdmin, SubscriberMetrics subscriberMetrics, Packaging packaging) {
+        this.resolverFactory = resolverFactory;
+        this.eventAdmin = eventAdmin;
+        this.subscriberMetrics = subscriberMetrics;
+        this.packaging = packaging;
+    }
+
+    public void activate(BundleContext context, Map<String, Object> properties) {
+        this.context = context;
+        this.idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+        requireNonNull(resolverFactory);
+        requireNonNull(eventAdmin);
+        requireNonNull(subscriberMetrics);
+    }
+    
+    public BookKeeper create(
+            DistributionPackageBuilder packageBuilder,
+            String subAgentName, 
+            String subSlingId, 
+            int maxRetries, 
+            boolean editable, 
+            PackageHandling packageHandling, 
+            Consumer<PackageStatusMessage> sender
+            ) {
+        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, packageHandling);
+        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+        SubscriberIdle subscriberIdle = new SubscriberIdle(context, this.idleMillies);
+        return new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, subscriberIdle, eventAdmin,
+                sender, subAgentName, subSlingId, editable, maxRetries);
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
index 998f929..c626550 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.util.Objects.requireNonNull;
 
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * Each distribution package is inspected for possible content packages in /etc/packages.
  * Such content packages are installed via the Packaging service.
  */
-public class ContentPackageExtractor {
+class ContentPackageExtractor {
     private static final String PACKAGE_BASE_PATH = "/etc/packages/";
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
new file mode 100644
index 0000000..37a4734
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_PATHS;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_TYPE;
+import static org.apache.sling.distribution.event.DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.osgi.service.event.Event;
+
+@ParametersAreNonnullByDefault
+class ImportedEventFactory {
+
+    public static final String PACKAGE_ID = "distribution.package.id";
+    private static final String KIND_IMPORTER = "importer";
+
+    private ImportedEventFactory() {
+    }
+    
+    public static Event create(Messages.PackageMessage pkgMsg, String agentName) {
+        String[] pathsList = pkgMsg.getPathsList().toArray(new String[0]);
+        Map<String, Object> props = new HashMap<>();
+        props.put(DISTRIBUTION_COMPONENT_KIND, KIND_IMPORTER);
+        props.put(DISTRIBUTION_COMPONENT_NAME, agentName);
+        props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
+        props.put(DISTRIBUTION_PATHS, pathsList);
+        props.put(PACKAGE_ID, pkgMsg.getPkgId());
+        return new Event(IMPORTER_PACKAGE_IMPORTED, props);
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
index c619d79..0018641 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.ModifiableValueMap;
@@ -44,7 +44,7 @@ import static java.util.Objects.requireNonNull;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 
 @ParametersAreNonnullByDefault
-public class LocalStore {
+class LocalStore {
 
     private static final String ROOT_PATH = "/var/sling/distribution/journal/stores";
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
new file mode 100644
index 0000000..375a792
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Histogram;
+import org.apache.sling.commons.metrics.Meter;
+import org.apache.sling.commons.metrics.Timer;
+
+enum NoopMetric implements Counter, Histogram, Timer, Meter{
+    INSTANCE;
+
+    @Override
+    public long getCount() {
+        return 0;
+    }
+
+    @Override
+    public void increment() {
+
+    }
+
+    @Override
+    public void decrement() {
+
+    }
+
+    @Override
+    public void increment(long n) {
+
+    }
+
+    @Override
+    public void decrement(long n) {
+
+    }
+
+    @Override
+    public void mark() {
+
+    }
+
+    @Override
+    public void mark(long n) {
+
+    }
+
+    @Override
+    public void update(long duration, TimeUnit unit) {
+
+    }
+
+    @Override
+    public Context time() {
+        return NoopContext.INSTANCE;
+    }
+
+    @Override
+    public void update(long value) {
+
+    }
+
+    @Override
+    public <AdapterType> AdapterType adaptTo(Class<AdapterType> type) {
+        return null;
+    }
+
+    private enum NoopContext implements Context {
+        INSTANCE;
+
+        @Override
+        public long stop() {
+            return 0;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
similarity index 64%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
index a7148c4..ef832f3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
@@ -16,24 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.lang.String.format;
 
+import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.util.Objects;
+
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PackageHandler {
+class PackageHandler {
     private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
     
     private DistributionPackageBuilder packageBuilder;
@@ -66,13 +74,38 @@ public class PackageHandler {
         LOG.info("Importing paths {}",pkgMsg.getPathsList());
         InputStream pkgStream = null;
         try {
-            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
-            packageBuilder.installPackage(resolver, pkgStream);
-            extractor.handle(resolver, pkgMsg.getPathsList());
+            pkgStream = pkgStream(resolver, pkgMsg);
+            if (canHandlePackage(pkgMsg)) {
+                packageBuilder.installPackage(resolver, pkgStream);
+                extractor.handle(resolver, pkgMsg.getPathsList());
+            }
         } finally {
             IOUtils.closeQuietly(pkgStream);
         }
+    }
 
+    private boolean canHandlePackage(PackageMessage pkgMsg) {
+        return Objects.equals(packageBuilder.getType(), pkgMsg.getPkgType());
+    }
+    
+    @Nonnull
+    public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+        if (pkgMsg.hasPkgBinary()) {
+            return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+        } else {
+            String pkgBinRef = pkgMsg.getPkgBinaryRef();
+            try {
+                Session session = resolver.adaptTo(Session.class);
+                if (session == null) {
+                    throw new DistributionException("Unable to get Oak session");
+                }
+                ValueFactory factory = session.getValueFactory();
+                Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+                return binary.getStream();
+            } catch (RepositoryException e) {
+                throw new DistributionException(e.getMessage(), e);
+            }
+        }
     }
 
     private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
similarity index 92%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
index 55dc55e..a1130fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 public enum PackageHandling {
     Off, Extract, Install
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
index 23f9463..1466d85 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +27,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
  * Holds package retries by agent name
  */
 @ParametersAreNonnullByDefault
-public class PackageRetries {
+class PackageRetries {
 
     // (pubAgentName x retries)
     private final Map<String, Integer> pubAgentNameToRetries = new ConcurrentHashMap<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
index a05b80c..e72378d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import java.io.Closeable;
 import java.util.Hashtable;
@@ -42,6 +42,7 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
 
     private final int idleMillis;
     private final AtomicBoolean isReady = new AtomicBoolean();
+    private final AtomicBoolean idle = new AtomicBoolean();
     private final ScheduledExecutorService executor;
     private ScheduledFuture<?> schedule;
 
@@ -69,17 +70,23 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
      * Called when processing of a message starts
      */
     public synchronized void busy() {
+        idle.set(false);
         cancelSchedule();
     }
     
     public boolean isReady() {
         return isReady.get();
     }
+    
+    public boolean isIdle() {
+        return idle.get();
+    }
 
     /**
      * Called when processing of a message has finished
      */
     public synchronized void idle() {
+        idle.set(true);
         if (!isReady.get()) {
             cancelSchedule();
             schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
similarity index 61%
copy from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
copy to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
index f18c04c..f5bcae0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.lang.String.format;
 
 import java.io.Closeable;
 import java.util.Dictionary;
 import java.util.Hashtable;
-import java.util.concurrent.Callable;
 import java.util.function.Supplier;
 
 import org.apache.sling.commons.metrics.Counter;
@@ -41,13 +40,11 @@ import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(service = DistributionMetricsService.class)
-public class DistributionMetricsService {
+@Component(service = SubscriberMetrics.class)
+public class SubscriberMetrics {
 
     public static final String BASE_COMPONENT = "distribution.journal";
 
-    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
-
     public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
     
     private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -55,18 +52,8 @@ public class DistributionMetricsService {
     @Reference
     private MetricsService metricsService;
 
-    private Counter cleanupPackageRemovedCount;
-
-    private Timer cleanupPackageDuration;
-
     private Histogram importedPackageSize;
 
-    private Histogram exportedPackageSize;
-
-    private Meter acceptedRequests;
-
-    private Meter droppedRequests;
-
     private Counter itemsBufferSize;
 
     private Timer removedPackageDuration;
@@ -83,26 +70,23 @@ public class DistributionMetricsService {
 
     private Timer packageDistributedDuration;
 
-    private Timer buildPackageDuration;
-
-    private Timer enqueuePackageDuration;
-
-    private Counter queueCacheFetchCount;
-
     private BundleContext context;
+    
+    public SubscriberMetrics() {
+        importedPackageSize = NoopMetric.INSTANCE;
+        itemsBufferSize = NoopMetric.INSTANCE;
+        importedPackageDuration = NoopMetric.INSTANCE;
+        removedPackageDuration = NoopMetric.INSTANCE;
+        removedFailedPackageDuration = NoopMetric.INSTANCE;
+        failedPackageImports = NoopMetric.INSTANCE;
+        sendStoredStatusDuration = NoopMetric.INSTANCE;
+        processQueueItemDuration = NoopMetric.INSTANCE;
+        packageDistributedDuration = NoopMetric.INSTANCE;
+    }
 
     @Activate
     public void activate(BundleContext context) {
         this.context = context;
-        cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
-        cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
-        exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
-        acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, "accepted_requests"));
-        droppedRequests = getMeter(getMetricName(PUB_COMPONENT, "dropped_requests"));
-        buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
-        enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
-        queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
         importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
         itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
         importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
@@ -115,56 +99,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Runs provided code updating provided metric
-     * with its execution time.
-     * The method guarantees that the metric is updated
-     * even if the code throws an exception
-     * @param metric metric to update
-     * @param code code to clock
-     * @throws Exception actually it doesn't
-     */
-    public static void timed(Timer metric, Runnable code) throws Exception {
-        try (Timer.Context ignored = metric.time()) {
-            code.run();
-        }
-    }
-
-    /**
-     * Runs provided code updating provided metric
-     * with its execution time.
-     * The method guarantees that the metric is updated
-     * even if the code throws an exception
-     * @param metric metric to update
-     * @param code code to clock
-     * @return a value returned but <code>code.call()</code> invocation
-     * @throws Exception if underlying code throws
-     */
-    public static <T> T timed(Timer metric, Callable<T> code) throws Exception {
-        try (Timer.Context ignored = metric.time()) {
-            return code.call();
-        }
-    }
-
-    /**
-     * Counter of package removed during the Package Cleanup Task.
-     * The count is the sum of all packages removed since the service started.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Counter getCleanupPackageRemovedCount() {
-        return cleanupPackageRemovedCount;
-    }
-
-    /**
-     * Timer of the Package Cleanup Task execution duration.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getCleanupPackageDuration() {
-        return cleanupPackageDuration;
-    }
-
-    /**
      * Histogram of the imported content package size in Byte.
      *
      * @return a Sling Metrics histogram
@@ -174,33 +108,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Histogram of the exported content package size in Bytes.
-     *
-     * @return a Sling Metrics histogram
-     */
-    public Histogram getExportedPackageSize() {
-        return exportedPackageSize;
-    }
-
-    /**
-     * Meter of requests returning an {@code DistributionRequestState.ACCEPTED} state.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getAcceptedRequests() {
-        return acceptedRequests;
-    }
-
-    /**
-     * Meter of requests returning an {@code DistributionRequestState.DROPPED} state.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getDroppedRequests() {
-        return droppedRequests;
-    }
-
-    /**
      * Counter of the package buffer size on the subscriber.
      *
      * @return a Sling Metrics counter
@@ -273,33 +180,6 @@ public class DistributionMetricsService {
         return packageDistributedDuration;
     }
 
-    /**
-     * Timer capturing the duration in ms of building a content package
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getBuildPackageDuration() {
-        return buildPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of adding a package to the queue
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getEnqueuePackageDuration() {
-        return enqueuePackageDuration;
-    }
-
-    /**
-     * Counter of fetch operations to feed the queue cache.
-     *
-     * @return a Sling Metric counter
-     */
-    public Counter getQueueCacheFetchCount() {
-        return queueCacheFetchCount;
-    }
-    
     public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
         return new GaugeService<>(name, description, supplier);
     }
@@ -327,7 +207,7 @@ public class DistributionMetricsService {
     public class GaugeService<T> implements Gauge<T>, Closeable {
         
         @SuppressWarnings("rawtypes")
-        private final ServiceRegistration<Gauge> reg;
+        private ServiceRegistration<Gauge> reg;
         private final Supplier<T> supplier;
 
         private GaugeService(String name, String description, Supplier<T> supplier) {
@@ -336,7 +216,9 @@ public class DistributionMetricsService {
             props.put(Constants.SERVICE_DESCRIPTION, description);
             props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
             props.put(Gauge.NAME, name);
-            reg = context.registerService(Gauge.class, this, props);
+            if (context != null) {
+                reg = context.registerService(Gauge.class, this, props);
+            }
         }
 
         @Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
index 858e8dc..51d5466 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
index bb49514..80c3747 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
similarity index 68%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index f18c04c..4dd1167 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.lang.String.format;
 
@@ -48,8 +48,6 @@ public class DistributionMetricsService {
 
     public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
 
-    public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
-    
     private final Logger log = LoggerFactory.getLogger(this.getClass());
 
     @Reference
@@ -59,30 +57,12 @@ public class DistributionMetricsService {
 
     private Timer cleanupPackageDuration;
 
-    private Histogram importedPackageSize;
-
     private Histogram exportedPackageSize;
 
     private Meter acceptedRequests;
 
     private Meter droppedRequests;
 
-    private Counter itemsBufferSize;
-
-    private Timer removedPackageDuration;
-
-    private Timer removedFailedPackageDuration;
-
-    private Timer importedPackageDuration;
-
-    private Meter failedPackageImports;
-
-    private Timer sendStoredStatusDuration;
-
-    private Timer processQueueItemDuration;
-
-    private Timer packageDistributedDuration;
-
     private Timer buildPackageDuration;
 
     private Timer enqueuePackageDuration;
@@ -102,16 +82,6 @@ public class DistributionMetricsService {
         buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
         enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
         queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
-        importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
-        itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
-        importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
-        removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
-        removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
-        failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
-        sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
-        processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
-        packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
     }
 
     /**
@@ -165,15 +135,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Histogram of the imported content package size in Byte.
-     *
-     * @return a Sling Metrics histogram
-     */
-    public Histogram getImportedPackageSize() {
-        return importedPackageSize;
-    }
-
-    /**
      * Histogram of the exported content package size in Bytes.
      *
      * @return a Sling Metrics histogram
@@ -201,79 +162,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Counter of the package buffer size on the subscriber.
-     *
-     * @return a Sling Metrics counter
-     */
-    public Counter getItemsBufferSize() {
-        return itemsBufferSize;
-    }
-
-    /**
-     * Timer capturing the duration in ms of successful packages import operations.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getImportedPackageDuration() {
-        return importedPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getRemovedPackageDuration() {
-        return removedPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getRemovedFailedPackageDuration() {
-        return removedFailedPackageDuration;
-    }
-
-    /**
-     * Meter of failures to import packages.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getFailedPackageImports() {
-        return failedPackageImports;
-    }
-
-    /**
-     * Timer capturing the duration in ms of sending a stored package status.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getSendStoredStatusDuration() {
-        return sendStoredStatusDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of processing a queue item.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getProcessQueueItemDuration() {
-        return processQueueItemDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of distributing a distribution package.
-     * The timer starts when the package is enqueued and stops when the package is successfully imported.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getPackageDistributedDuration() {
-        return packageDistributedDuration;
-    }
-
-    /**
      * Timer capturing the duration in ms of building a content package
      *
      * @return a Sling Metric timer
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
index 529fc52..ed2d461 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
 
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
index 28f5aa8..48727a7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
index e4f36c9..a1e3924 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
index 3582b36..6cb5170 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Objects.requireNonNull;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
index 287f3fa..9478372 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.util.Objects;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
index 2bb30b5..18cf976 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
index b9d1d2d..72eb177 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
similarity index 99%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
index 30f8836..8ac9ba5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
index 47a9f25..58a0717 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
index e49700e..eb0510c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
index 1f4d44a..c9ce5e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
index f279265..7808b12 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
@@ -22,12 +22,13 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.junit.Test;
 
 public class DefaultPreconditionTest {
     @Test
     public void testAlwaysTrue() {
-        boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10);
-        assertThat(canProcess, equalTo(true));
+        Decision decision = new DefaultPrecondition().canProcess("any", 100);
+        assertThat(decision, equalTo(Decision.ACCEPT));
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 571d98a..1484a9e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -19,10 +19,10 @@
 package org.apache.sling.distribution.journal.impl.precondition;
 
 import org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..a5e969b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -19,26 +19,21 @@
 package org.apache.sling.distribution.journal.impl.precondition;
 
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
@@ -89,47 +84,28 @@ public class StagingPreconditionTest {
         statusHandler = statusCaptor.getValue().getHandler();
     }
     
-    @Test(expected = IllegalArgumentException.class)
-    public void testIllegalTimeout() throws InterruptedException, TimeoutException {
-        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
-    }
-    
-    @Test(expected = TimeoutException.class)
+    @Test
     public void testNotYetProcessed() throws InterruptedException, TimeoutException {
         simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED);
-        boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1);
-        assertThat(res, equalTo(true));
+        Decision res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT);
+        assertThat(res, equalTo(Decision.WAIT));
+
+        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT);
+        assertThat(res2, equalTo(Decision.WAIT));
 
-        // We got no package for this agent. So this should time out
-        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
     }
     
     @Test
-    public void testDeactivateDuringCanProcess() {
-        AtomicReference<Throwable> exHolder = new AtomicReference<>();
-        Thread th = new Thread(() -> {
-            try {
-                precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2);
-            } catch (Throwable t) {
-                exHolder.set(t);
-            }
-        });
-        th.start();
-        precondition.deactivate();
-        Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
-        assertThat(ex, instanceOf(InterruptedException.class));
-    }
-    
-    @Test(expected = TimeoutException.class)
     public void testCleanup() throws InterruptedException, TimeoutException {
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
-        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+        Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+        assertThat(res, equalTo(Decision.ACCEPT));
         
         // Cleanup
         precondition.run();
         
-        // Should time out because after cleanup message is not present anymore
-        precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+        assertThat(res2, equalTo(Decision.WAIT));
     }
     
     @Test
@@ -138,9 +114,9 @@ public class StagingPreconditionTest {
         simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED);
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
 
-        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
-        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
-        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000), equalTo(Decision.SKIP));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001), equalTo(Decision.SKIP));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002), equalTo(Decision.ACCEPT));
     }
 
     private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 5e7b76c..2b82486 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -30,8 +30,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.UUID;
 
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +38,8 @@ import org.mockito.Mockito;
 
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index 2a41a2f..f52026c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,6 +41,7 @@ import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
@@ -53,8 +54,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
-
 @SuppressWarnings("rawtypes")
 public class DistPublisherJMXTest {
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7ab2bdf..434c561 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -38,9 +38,9 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import com.google.common.collect.ImmutableMap;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index acc91b0..ab5d17e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -21,9 +21,9 @@ package org.apache.sling.distribution.journal.impl.publisher;
 import java.util.Collections;
 import java.util.HashSet;
 
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index a8fb467..9b41463 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -30,8 +30,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.function.Function;
 
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -57,6 +55,8 @@ import org.mockito.Spy;
 import org.osgi.framework.BundleContext;
 
 import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index 6aeb1ba..e3b5582 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -31,13 +31,13 @@ import static org.hamcrest.Matchers.hasItemInArray;
 import static org.junit.Assert.assertThat;
 
 import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.MessageInfo;
 
 public class QueueItemFactoryTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
index 11a52f9..5aededc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import java.util.HashMap;
 
+import org.apache.sling.distribution.journal.shared.EntryUtil;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.junit.Test;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
index 18cc957..e7f9639 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
@@ -34,7 +34,7 @@ import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.junit.Test;
 
 public class OffsetQueueImplJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 1113319..5fca8a3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -57,10 +57,10 @@ import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 61c1389..166ad1a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,7 +38,6 @@ import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import com.google.protobuf.GeneratedMessage;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -58,6 +57,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 20dccea..211b9fc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -37,6 +37,7 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 778bbaa..3cef383 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -28,7 +28,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +40,7 @@ import org.mockito.MockitoAnnotations;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
deleted file mode 100644
index d2784e9..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import com.google.common.collect.Lists;
-
-public class SubQueueTest {
-
-    @Test
-    public void testGetName() throws Exception {
-        String queueName = "someQueue";
-        SubQueue queue = new SubQueue(queueName, null, new PackageRetries());
-        Assert.assertEquals(queueName, queue.getName());
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testAdd() throws Exception {
-        SubQueue queue = new SubQueue("someQueue", null, new PackageRetries());
-        queue.add(buildQueueItem("package-1"));
-    }
-
-    @Test
-    public void testGetHead() throws Exception {
-        SubQueue emptyQueue = new SubQueue("emptyQueue", null, new PackageRetries());
-        Assert.assertNull(emptyQueue.getHead());
-        SubQueue oneQueue = new SubQueue("oneQueue", buildQueueItem("1"), new PackageRetries());
-        Assert.assertNotNull(oneQueue.getHead());
-    }
-
-    @Test
-    public void testGetItems() throws Exception {
-        SubQueue oneQueue = new SubQueue("oneQueue", null, new PackageRetries());
-        Assert.assertNotNull(oneQueue.getEntries(0, 10));
-        SubQueue tenQueue = new SubQueue("tenQueue", buildQueueItem("1"), new PackageRetries());
-        Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, 10)).size());
-        Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, -1)).size());
-        Assert.assertEquals(0, Lists.newArrayList(tenQueue.getEntries(1, 10)).size());
-    }
-
-    private DistributionQueueItem buildQueueItem(String packageId) {
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(QueueItemFactory.RECORD_TOPIC, "topic");
-        properties.put(QueueItemFactory.RECORD_OFFSET, 0);
-        properties.put(QueueItemFactory.RECORD_PARTITION, 0);
-        properties.put(QueueItemFactory.RECORD_TIMESTAMP, System.currentTimeMillis());
-        return new DistributionQueueItem(packageId, properties);
-    }
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index 05f8a88..e669a82 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,7 +27,9 @@ import static org.mockito.Mockito.when;
 import java.util.Collections;
 import java.util.function.Consumer;
 
+import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
 import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index d6c774e..0de3d2e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -33,10 +33,10 @@ import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1d47520..4d0710c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,12 +21,9 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
 import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -40,42 +37,39 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ResourceUtil;
-import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Histogram;
-import org.apache.sling.commons.metrics.Meter;
-import org.apache.sling.commons.metrics.Timer;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
-import org.apache.sling.distribution.SimpleDistributionRequest;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.settings.SlingSettingsService;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
@@ -83,34 +77,26 @@ import org.awaitility.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.util.converter.Converters;
 
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.ByteString;
 
 @SuppressWarnings("unchecked")
+@RunWith(MockitoJUnitRunner.class)
 public class SubscriberTest {
 
     private static final String SUB1_SLING_ID = "sub1sling";
@@ -119,7 +105,7 @@ public class SubscriberTest {
     private static final String PUB1_SLING_ID = "pub1sling";
     private static final String PUB1_AGENT_NAME = "pub1agent";
 
-    private static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
+    public static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
             .setPkgId("myid")
             .setPubSlingId(PUB1_SLING_ID)
             .setPubAgentName(PUB1_AGENT_NAME)
@@ -138,6 +124,8 @@ public class SubscriberTest {
             .addAllPaths(Arrays.asList("/test"))
             .build();
 
+    @Mock
+    Packaging packaging;
     
     @Mock
     private BundleContext context;
@@ -172,8 +160,10 @@ public class SubscriberTest {
     @Mock
     private MessageSender<PackageStatusMessage> statusSender;
 
-    @Mock
-    private DistributionMetricsService distributionMetricsService;
+    @Spy
+    private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
+    
+    BookKeeperFactory bookKeeperFactory;
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -198,12 +188,14 @@ public class SubscriberTest {
         
         Awaitility.setDefaultPollDelay(Duration.ZERO);
         Awaitility.setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
-        MockitoAnnotations.initMocks(this);
+        
+        bookKeeperFactory = new BookKeeperFactory(resolverFactory, eventAdmin, subscriberMetrics, packaging);
+        Map<String, Object> props = new HashMap<String, Object>();
+        bookKeeperFactory.activate(context, props);
+        subscriber.bookKeeperFactory = bookKeeperFactory;
         when(packageBuilder.getType()).thenReturn("journal");
         when(slingSettings.getSlingId()).thenReturn(SUB1_SLING_ID);
 
-        mockMetrics();
-
         when(clientProvider.<PackageStatusMessage>createSender()).thenReturn(statusSender, (MessageSender) discoverySender);
         when(clientProvider.createPoller(
                 Mockito.anyString(),
@@ -211,8 +203,7 @@ public class SubscriberTest {
                 Mockito.anyString(),
                 packageCaptor.capture()))
             .thenReturn(poller);
-        when(context.registerService(Mockito.any(Class.class), (DistributionAgent) eq(subscriber), Mockito.any(Dictionary.class))).thenReturn(reg);
-
+        
         // you should call initSubscriber in each test method
     }
 
@@ -227,10 +218,6 @@ public class SubscriberTest {
         assumeNoPrecondition();
         initSubscriber();
 
-        assertThat(subscriber.getQueueNames(), contains(PUB1_AGENT_NAME));
-        assertThat(subscriber.getQueue(PUB1_AGENT_NAME).getStatus().getState(), equalTo(DistributionQueueState.IDLE));
-        assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
-        
         MessageInfo info = new TestMessageInfo("", 1, 0, 0);
 
         PackageMessage message = BASIC_ADD_PACKAGE;
@@ -239,20 +226,13 @@ public class SubscriberTest {
         when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), 
                 Mockito.any(ByteArrayInputStream.class))
                 ).thenAnswer(new WaitFor(sem));
-        packageHandler.handle(info, message);
-        
+        packageHandler.handle(info, message); 
         waitSubscriber(RUNNING);
-        DistributionQueue queue = subscriber.getQueue(PUB1_AGENT_NAME);
-        DistributionQueueEntry item = queue.getHead();
-        assertThat(item.getStatus().getItemState(), equalTo(DistributionQueueItemState.QUEUED));
         
         sem.release();
         waitSubscriber(IDLE);
         verify(statusSender, times(0)).send(eq(topics.getStatusTopic()),
                 anyObject());
-        List<String> log = subscriber.getLog().getLines();
-        // We do not use the DistributionLog anymore
-        assertThat(log.size(), equalTo(0));
     }
 
 	@Test
@@ -268,25 +248,10 @@ public class SubscriberTest {
         PackageMessage message = BASIC_DEL_PACKAGE;
 
         packageHandler.handle(info, message);
-        waitSubscriber(RUNNING);
-        waitSubscriber(IDLE);
-        try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
-            assertThat(resolver.getResource("/test"), nullValue());
-        }
+        await().until(() -> getResource("/test"), nullValue());
     }
 
     @Test
-    public void testExecuteNotSupported() throws DistributionException {
-        assumeNoPrecondition();
-        initSubscriber();
-
-        DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "test");
-        DistributionResponse response = subscriber.execute(resourceResolver, request);
-        assertThat(response.getState(), equalTo(DistributionRequestState.DROPPED));
-    }
-
-
-    @Test
     public void testSendFailedStatus() throws DistributionException {
         assumeNoPrecondition();
         initSubscriber(ImmutableMap.of("maxRetries", "1"));
@@ -324,37 +289,17 @@ public class SubscriberTest {
         initSubscriber();
         MessageInfo info = new TestMessageInfo("", 1, 11, 0);
         PackageMessage message = BASIC_ADD_PACKAGE;
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11))).thenReturn(Decision.SKIP);
 
         packageHandler.handle(info, message);
-        waitSubscriber(RUNNING);
-        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false);
-
         try {
             waitSubscriber(IDLE);
             fail("Cannot be IDLE without a validation status");
         } catch (Throwable t) {
 
         }
-
-        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true);
-        waitSubscriber(IDLE);
-
     }
     
-    @Test
-    public void testReadyWhenWatingForPrecondition() {
-        Semaphore sem = new Semaphore(0);
-        assumeWaitingForPrecondition(sem);
-        initSubscriber();
-        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-        PackageMessage message = BASIC_ADD_PACKAGE;
-
-        packageHandler.handle(info, message);
-        waitSubscriber(RUNNING);
-        await("Should report ready").until(subscriber.subscriberIdle::isReady);
-        sem.release();
-    }
-
     private void initSubscriber() {
         initSubscriber(Collections.emptyMap());
     }
@@ -376,51 +321,20 @@ public class SubscriberTest {
         await().until(subscriber::getState, equalTo(expectedState));
     }
 
-    private void mockMetrics() {
-        Histogram histogram = Mockito.mock(Histogram.class);
-        Counter counter = Mockito.mock(Counter.class);
-        Meter meter = Mockito.mock(Meter.class);
-        Timer timer = Mockito.mock(Timer.class);
-        Timer.Context timerContext = Mockito.mock(Timer.Context.class);
-        when(timer.time())
-            .thenReturn(timerContext);
-        when(distributionMetricsService.getImportedPackageSize())
-                .thenReturn(histogram);
-        when(distributionMetricsService.getItemsBufferSize())
-                .thenReturn(counter);
-        when(distributionMetricsService.getFailedPackageImports())
-                .thenReturn(meter);
-        when(distributionMetricsService.getRemovedFailedPackageDuration())
-                .thenReturn(timer);
-        when(distributionMetricsService.getRemovedPackageDuration())
-                .thenReturn(timer);
-        when(distributionMetricsService.getImportedPackageDuration())
-                .thenReturn(timer);
-        when(distributionMetricsService.getSendStoredStatusDuration())
-                .thenReturn(timer);
-        when(distributionMetricsService.getProcessQueueItemDuration())
-                .thenReturn(timer);
-        when(distributionMetricsService.getPackageDistributedDuration())
-                .thenReturn(timer);
-    }
-
     private void assumeNoPrecondition() {
         try {
-            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true);
+            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.ACCEPT);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    private void assumeWaitingForPrecondition(Semaphore sem) {
-        try {
-            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt()))
-                .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+    private Resource getResource(String path) throws LoginException {
+        try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
+            return resolver.getResource(path);
         }
     }
-    
+
     private final class WaitFor implements Answer<DistributionPackageInfo> {
         private final Semaphore sem;
     
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
similarity index 61%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
index fd4c761..2ae2803 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
@@ -16,24 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.subscriber.SubscriberTest;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
 import org.osgi.service.event.EventAdmin;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -43,8 +51,7 @@ public class BookKeeperTest {
 
     private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
 
-    @Mock
-    private DistributionMetricsService distributionMetricsService;
+    private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
 
     @Mock
     private PackageHandler packageHandler;
@@ -54,13 +61,18 @@ public class BookKeeperTest {
 
     @Mock
     private Consumer<PackageStatusMessage> sender;
+    
+    @Mock
+    BundleContext context;
 
     private BookKeeper bookKeeper;
 
     @Before
     public void before() {
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
+        SubscriberIdle subscriberIdle = new SubscriberIdle(context, 300);
+        bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics,  packageHandler, subscriberIdle, eventAdmin, sender,
                 "subAgentName", "subSlingId", true, 10);
+        sem = new Semaphore(0);
     }
 
     @Test
@@ -78,5 +90,29 @@ public class BookKeeperTest {
             assertThat(bookKeeper.loadOffset(), equalTo(20l));
         }
     }
+    
+    @Test
+    public void testReadyWhenWatingForPrecondition() {
+        
+        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+        PackageMessage message = SubscriberTest.BASIC_ADD_PACKAGE;
+        Executors.newSingleThreadExecutor().execute(() -> {
+            try {
+                bookKeeper.processPackage(info, message, this::waitForSemaphore);
+            } catch (Exception e) {
+            }
+        });
+        await("Should report ready").until(bookKeeper.subscriberIdle::isReady);
+        sem.release();
+    }
+
+    public boolean waitForSemaphore(long offset, PackageMessage msg) {
+        try {
+            return sem.tryAcquire(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            return false;
+        }
+    }
 
+    private Semaphore sem;
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
index 0868cc8..096b632 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.util.Collections.singletonList;
 import static org.mockito.Mockito.verify;
@@ -37,6 +37,7 @@ import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.sling.MockSling;
 import org.apache.sling.testing.mock.sling.ResourceResolverType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
index 11b346a..1a03199 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertEquals;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.LocalStore;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Test;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
index 6c33412..4f2f0a4 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+
 public class PackageRetriesTest {
 
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
similarity index 94%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
index ef5f0d7..6c40034 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.osgi.framework.BundleContext;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
similarity index 85%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index 1049ef5..50d5507 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertNotNull;
@@ -32,7 +32,8 @@ import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -94,16 +95,7 @@ public class DistributionMetricsServiceTest {
         assertNotNull(metrics.getDroppedRequests());
         assertNotNull(metrics.getEnqueuePackageDuration());
         assertNotNull(metrics.getExportedPackageSize());
-        assertNotNull(metrics.getFailedPackageImports());
-        assertNotNull(metrics.getImportedPackageDuration());
-        assertNotNull(metrics.getImportedPackageSize());
-        assertNotNull(metrics.getItemsBufferSize());
-        assertNotNull(metrics.getPackageDistributedDuration());
-        assertNotNull(metrics.getProcessQueueItemDuration());
         assertNotNull(metrics.getQueueCacheFetchCount());
-        assertNotNull(metrics.getRemovedFailedPackageDuration());
-        assertNotNull(metrics.getRemovedPackageDuration());
-        assertNotNull(metrics.getSendStoredStatusDuration());
     }
     
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
index bd44057..8fe7ad9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.time.Duration.of;
 import static java.time.temporal.ChronoUnit.MILLIS;
@@ -27,6 +27,7 @@ import java.time.temporal.ChronoUnit;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.sling.distribution.journal.shared.ExponentialBackOff;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.runners.MockitoJUnitRunner;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
index ce7f279..0214b83 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
@@ -35,9 +35,12 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
-import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
index fed9fef..7fd0207 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.samePropertyValuesAs;
@@ -29,7 +29,6 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 
-import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,6 +41,7 @@ import org.mockito.MockitoAnnotations;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.LimitPoller;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
index 14dcc19..ede1b69 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -36,6 +36,7 @@ import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
index aa2947a..f483c86 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Collections.emptyList;
 import static org.hamcrest.Matchers.containsString;
@@ -43,6 +43,9 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.shared.PackageViewerPlugin;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
index 5d31bf6..7202b6b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
 import org.junit.Test;
 
 public class SimpleDistributionResponseTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
index a2105e3..9145dfe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import org.apache.sling.distribution.journal.MessageInfo;