You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/03/26 12:03:22 UTC

[sling-org-apache-sling-distribution-journal] 01/01: SLING-9259 - Extract service subscriber code into separate package

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9259
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 3ff39d5ee7d1607283a2a571b6c8e2224be1e839
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Mar 26 11:30:53 2020 +0100

    SLING-9259 - Extract service subscriber code into separate package
---
 .../impl/precondition/PackageStatusWatcher.java    |   2 +-
 .../impl/precondition/StagingPrecondition.java     |   2 +-
 .../journal/impl/publisher/DiscoveryService.java   |   2 +-
 .../impl/publisher/DistributionPublisher.java      |  15 ++-
 .../impl/publisher/PackageDistributedNotifier.java |   2 +-
 .../journal/impl/publisher/PackageRepo.java        |   4 +-
 .../journal/impl/queue/impl/PubErrQueue.java       |   2 +
 .../journal/impl/queue/impl/PubQueue.java          |   2 +
 .../journal/impl/queue/impl/PubQueueCache.java     |   4 +-
 .../impl/queue/impl/PubQueueCacheService.java      |   4 +-
 .../impl/queue/impl/PubQueueProviderImpl.java      |   2 +-
 .../journal/impl/subscriber/Announcer.java         |   1 +
 .../journal/impl/subscriber/CommandPoller.java     |   2 +-
 .../impl/subscriber/DistributionSubscriber.java    |  23 ++--
 .../impl/{queue/impl => subscriber}/SubQueue.java  |   5 +-
 .../impl/subscriber/SubscriberConfiguration.java   |   1 +
 .../{impl => service}/subscriber/BookKeeper.java   |  32 +++--
 .../subscriber/ContentPackageExtractor.java        |   2 +-
 .../service/subscriber/ImportedEventFactory.java   |  55 +++++++++
 .../{impl => service}/subscriber/LocalStore.java   |   2 +-
 .../subscriber/PackageHandler.java                 |  34 +++++-
 .../subscriber/PackageHandling.java                |   2 +-
 .../subscriber}/PackageRetries.java                |   2 +-
 .../subscriber/SubscriberIdle.java                 |   2 +-
 .../subscriber/SubscriberMetrics.java}             | 136 +--------------------
 .../journal/{impl => }/shared/AgentState.java      |   2 +-
 .../{impl => }/shared/DefaultDistributionLog.java  |   2 +-
 .../shared/DistributionMetricsService.java         | 112 +----------------
 .../{impl/queue/impl => shared}/EntryUtil.java     |   2 +-
 .../{impl => }/shared/ExponentialBackOff.java      |   2 +-
 .../journal/{impl => }/shared/JMXRegistration.java |   2 +-
 .../{impl => }/shared/JournalAvailableChecker.java |   4 +-
 .../shared/JournalAvailableServiceMarker.java      |   2 +-
 .../journal/{impl => }/shared/LimitPoller.java     |   2 +-
 .../journal/{impl => }/shared/PackageBrowser.java  |   2 +-
 .../{impl => }/shared/PackageViewerPlugin.java     |   2 +-
 .../queue/impl => shared}/QueueEntryFactory.java   |   2 +-
 .../shared/SimpleDistributionResponse.java         |   2 +-
 .../journal/{impl => }/shared/Topics.java          |   2 +-
 .../precondition/PackageStatusWatcherTest.java     |   4 +-
 .../impl/precondition/StagingPreconditionTest.java |   4 +-
 .../impl/publisher/DiscoveryServiceTest.java       |   4 +-
 .../impl/publisher/DistPublisherJMXTest.java       |   3 +-
 .../impl/publisher/DistributionPublisherTest.java  |   4 +-
 .../publisher/PackageDistributedNotifierTest.java  |   2 +-
 .../journal/impl/publisher/PackageRepoTest.java    |   4 +-
 .../journal/impl/queue/QueueItemFactoryTest.java   |   2 +-
 .../journal/impl/queue/impl/EntryUtilTest.java     |   1 +
 .../impl/queue/impl/OffsetQueueImplJMXTest.java    |   2 +-
 .../journal/impl/queue/impl/PubQueueCacheTest.java |   4 +-
 .../impl/queue/impl/PubQueueProviderTest.java      |   2 +-
 .../journal/impl/queue/impl/PubQueueTest.java      |   1 +
 .../journal/impl/queue/impl/RangePollerTest.java   |   2 +-
 .../journal/impl/subscriber/AnnouncerTest.java     |   2 +
 .../journal/impl/subscriber/CommandPollerTest.java |   4 +-
 .../journal/impl/subscriber/SubscriberTest.java    |  49 ++++----
 .../subscriber/BookKeeperTest.java                 |   7 +-
 .../subscriber/ContentPackageExtractorTest.java    |   4 +-
 .../subscriber/LocalStoreTest.java                 |   3 +-
 .../subscriber}/PackageRetriesTest.java            |   4 +-
 .../impl => service/subscriber}/SubQueueTest.java  |   5 +-
 .../subscriber/SubscriberIdleTest.java             |   3 +-
 .../shared/DistributionMetricsServiceTest.java     |  14 +--
 .../{impl => }/shared/ExponentialBackoffTest.java  |   3 +-
 .../shared/JournalAvailableCheckerTest.java        |  11 +-
 .../journal/{impl => }/shared/LimitPollerTest.java |   4 +-
 .../{impl => }/shared/PackageBrowserTest.java      |   3 +-
 .../{impl => }/shared/PackageViewerPluginTest.java |   5 +-
 .../shared/SimpleDistributionResponseTest.java     |   3 +-
 .../journal/{impl => }/shared/TestMessageInfo.java |   2 +-
 70 files changed, 253 insertions(+), 392 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0408e0d..506cf55 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
 
 public class PackageStatusWatcher implements Closeable {
     private final Closeable poller;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..bbad307 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -25,8 +25,8 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index af052a3..c201a42 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -28,9 +28,9 @@ import java.util.Hashtable;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.commons.io.IOUtils;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 55787dc..c1c75d0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -20,12 +20,12 @@ package org.apache.sling.distribution.journal.impl.publisher;
 
 
 import static java.util.stream.StreamSupport.stream;
-import static org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.timed;
 import static java.util.Objects.requireNonNull;
 import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
 import static org.apache.sling.distribution.DistributionRequestType.ADD;
 import static org.apache.sling.distribution.DistributionRequestType.DELETE;
 import static org.apache.sling.distribution.DistributionRequestType.TEST;
+import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,11 +47,6 @@ import javax.management.NotCompliantMBeanException;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
@@ -73,9 +68,13 @@ import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
-
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.AgentState;
+import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index 591a0cf..e3dace3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -29,8 +29,8 @@ import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.JsonMessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index aee0c52..81a4fba 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -25,8 +25,6 @@ import javax.jcr.Node;
 import javax.jcr.Property;
 import javax.jcr.nodetype.NodeType;
 
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.jackrabbit.api.ReferenceBinary;
 import org.apache.jackrabbit.commons.JcrUtils;
 import org.apache.sling.api.resource.LoginException;
@@ -47,6 +45,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index aaa7b1c..db27714 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -26,6 +26,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 9bfd4fd..197d0b1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -40,6 +40,8 @@ import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 7eb3451..6621979 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -40,8 +40,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
@@ -52,6 +50,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 46a2d00..eac1f1e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -20,9 +20,9 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 3b7feab..550c055 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -29,11 +29,11 @@ import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index 7ecd03b..beeb62d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -32,6 +32,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index f9d047b..3a1df97 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..b044704 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -67,12 +67,15 @@ import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
+import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandler;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.AgentState;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.log.spi.DistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -133,7 +136,7 @@ public class DistributionSubscriber implements DistributionAgent {
     private Precondition precondition;
 
     @Reference
-    private DistributionMetricsService distributionMetricsService;
+    private SubscriberMetrics subcriberMetrics;
 
     @Reference
     private Packaging packaging;
@@ -188,7 +191,7 @@ public class DistributionSubscriber implements DistributionAgent {
 
         ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
         PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
+        bookKeeper = new BookKeeper(resolverFactory, subcriberMetrics, packageHandler, eventAdmin,
                 sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
         
         long startOffset = bookKeeper.loadOffset() + 1;
@@ -339,7 +342,7 @@ public class DistributionSubscriber implements DistributionAgent {
         try {
             while (running) {
                 if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
-                    distributionMetricsService.getItemsBufferSize().increment();
+                    subcriberMetrics.getItemsBufferSize().increment();
                     return;
                 }
             }
@@ -368,7 +371,7 @@ public class DistributionSubscriber implements DistributionAgent {
             bookKeeper.sendStoredStatus();
             DistributionQueueItem item = blockingPeekQueueItem();
 
-            try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
+            try (Timer.Context context = subcriberMetrics.getProcessQueueItemDuration().time()) {
                 processQueueItem(item);
             } finally {
                 subscriberIdle.idle();
@@ -412,7 +415,7 @@ public class DistributionSubscriber implements DistributionAgent {
             bookKeeper.importPackage(pkgMsg, offset, createdTime);
         }
         queueItemsBuffer.remove();
-        distributionMetricsService.getItemsBufferSize().decrement();
+        subcriberMetrics.getItemsBufferSize().decrement();
     }
 
     private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubQueue.java
similarity index 95%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubQueue.java
index 5d34e6d..2d7a3ca 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
 import java.util.Collections;
 import java.util.List;
@@ -29,6 +29,9 @@ import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 937c30a..625a62b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
 import org.osgi.service.metatype.annotations.AttributeDefinition;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
similarity index 90%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
index 53a7245..224279c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.lang.String.format;
 import static java.lang.System.currentTimeMillis;
@@ -42,13 +42,11 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -85,7 +83,7 @@ public class BookKeeper implements Closeable {
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
-    private final DistributionMetricsService distributionMetricsService;
+    private final SubscriberMetrics subsciberMetrics;
     private final PackageHandler packageHandler;
     private final EventAdmin eventAdmin;
     private final Consumer<PackageStatusMessage> sender;
@@ -102,7 +100,7 @@ public class BookKeeper implements Closeable {
     private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
-            DistributionMetricsService distributionMetricsService,
+            SubscriberMetrics subcriberMetrics,
             PackageHandler packageHandler,
             EventAdmin eventAdmin,
             Consumer<PackageStatusMessage> sender,
@@ -113,9 +111,9 @@ public class BookKeeper implements Closeable {
         this.packageHandler = packageHandler;
         this.eventAdmin = eventAdmin;
         String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
-        this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+        this.retriesGauge = subcriberMetrics.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
         this.resolverFactory = resolverFactory;
-        this.distributionMetricsService = distributionMetricsService;
+        this.subsciberMetrics = subcriberMetrics;
         this.sender = sender;
         this.subAgentName = subAgentName;
         this.subSlingId = subSlingId;
@@ -148,7 +146,7 @@ public class BookKeeper implements Closeable {
         log.info("Importing distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         addPackageMDC(pkgMsg);
-        try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
+        try (Timer.Context context = subsciberMetrics.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
             packageHandler.apply(importerResolver, pkgMsg);
             if (editable) {
@@ -156,10 +154,10 @@ public class BookKeeper implements Closeable {
             }
             storeOffset(importerResolver, offset);
             importerResolver.commit();
-            distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
-            distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+            subsciberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
+            subsciberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
             packageRetries.clear(pkgMsg.getPubAgentName());
-            Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
+            Event event = ImportedEventFactory.create(pkgMsg, subAgentName);
             eventAdmin.postEvent(event);
         } catch (LoginException | IOException | RuntimeException e) {
             failure(pkgMsg, offset, e);
@@ -195,7 +193,7 @@ public class BookKeeper implements Closeable {
      * @throws DistributionException if the package should be retried
      */
     private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
-        distributionMetricsService.getFailedPackageImports().mark();
+        subsciberMetrics.getFailedPackageImports().mark();
 
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
@@ -213,7 +211,7 @@ public class BookKeeper implements Closeable {
     public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
         log.info("Removing distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
-        Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
+        Timer.Context context = subsciberMetrics.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             if (editable) {
                 storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
@@ -250,7 +248,7 @@ public class BookKeeper implements Closeable {
      * @throws InterruptedException
      */
     public void sendStoredStatus() throws InterruptedException {
-        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+        try (Timer.Context context = subsciberMetrics.getSendStoredStatusDuration().time()) {
             PackageStatus status = new PackageStatus(statusStore.load());
             boolean sent = status.sent;
             int retry = 0;
@@ -315,7 +313,7 @@ public class BookKeeper implements Closeable {
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
         log.info("Removing failed distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
-        Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
+        Timer.Context context = subsciberMetrics.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
             storeOffset(resolver, offset);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
index 998f929..f17f7b5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.util.Objects.requireNonNull;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
new file mode 100644
index 0000000..8ad6f86
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_PATHS;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_TYPE;
+import static org.apache.sling.distribution.event.DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.osgi.service.event.Event;
+
+@ParametersAreNonnullByDefault
+public class ImportedEventFactory {
+
+    public static final String PACKAGE_ID = "distribution.package.id";
+    private static final String KIND_IMPORTER = "importer";
+
+    private ImportedEventFactory() {
+    }
+    
+    public static Event create(Messages.PackageMessage pkgMsg, String agentName) {
+        String[] pathsList = pkgMsg.getPathsList().toArray(new String[0]);
+        Map<String, Object> props = new HashMap<>();
+        props.put(DISTRIBUTION_COMPONENT_KIND, KIND_IMPORTER);
+        props.put(DISTRIBUTION_COMPONENT_NAME, agentName);
+        props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
+        props.put(DISTRIBUTION_PATHS, pathsList);
+        props.put(PACKAGE_ID, pkgMsg.getPkgId());
+        return new Event(IMPORTER_PACKAGE_IMPORTED, props);
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
index c619d79..0a3c541 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.ModifiableValueMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
similarity index 71%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
index a7148c4..aadc77a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
@@ -16,18 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.lang.String.format;
 
+import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+
 import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.slf4j.Logger;
@@ -66,13 +73,32 @@ public class PackageHandler {
         LOG.info("Importing paths {}",pkgMsg.getPathsList());
         InputStream pkgStream = null;
         try {
-            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+            pkgStream = pkgStream(resolver, pkgMsg);
             packageBuilder.installPackage(resolver, pkgStream);
             extractor.handle(resolver, pkgMsg.getPathsList());
         } finally {
             IOUtils.closeQuietly(pkgStream);
         }
-
+    }
+    
+    @Nonnull
+    public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+        if (pkgMsg.hasPkgBinary()) {
+            return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+        } else {
+            String pkgBinRef = pkgMsg.getPkgBinaryRef();
+            try {
+                Session session = resolver.adaptTo(Session.class);
+                if (session == null) {
+                    throw new DistributionException("Unable to get Oak session");
+                }
+                ValueFactory factory = session.getValueFactory();
+                Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+                return binary.getStream();
+            } catch (RepositoryException e) {
+                throw new DistributionException(e.getMessage(), e);
+            }
+        }
     }
 
     private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
similarity index 92%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
index 55dc55e..a1130fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 public enum PackageHandling {
     Off, Extract, Install
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
index 23f9463..3208572 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
index a05b80c..d51da87 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import java.io.Closeable;
 import java.util.Hashtable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
similarity index 63%
copy from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
copy to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
index f18c04c..8dd6cab 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.lang.String.format;
 
 import java.io.Closeable;
 import java.util.Dictionary;
 import java.util.Hashtable;
-import java.util.concurrent.Callable;
 import java.util.function.Supplier;
 
 import org.apache.sling.commons.metrics.Counter;
@@ -41,8 +40,8 @@ import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(service = DistributionMetricsService.class)
-public class DistributionMetricsService {
+@Component(service = SubscriberMetrics.class)
+public class SubscriberMetrics {
 
     public static final String BASE_COMPONENT = "distribution.journal";
 
@@ -55,18 +54,8 @@ public class DistributionMetricsService {
     @Reference
     private MetricsService metricsService;
 
-    private Counter cleanupPackageRemovedCount;
-
-    private Timer cleanupPackageDuration;
-
     private Histogram importedPackageSize;
 
-    private Histogram exportedPackageSize;
-
-    private Meter acceptedRequests;
-
-    private Meter droppedRequests;
-
     private Counter itemsBufferSize;
 
     private Timer removedPackageDuration;
@@ -83,26 +72,11 @@ public class DistributionMetricsService {
 
     private Timer packageDistributedDuration;
 
-    private Timer buildPackageDuration;
-
-    private Timer enqueuePackageDuration;
-
-    private Counter queueCacheFetchCount;
-
     private BundleContext context;
 
     @Activate
     public void activate(BundleContext context) {
         this.context = context;
-        cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
-        cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
-        exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
-        acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, "accepted_requests"));
-        droppedRequests = getMeter(getMetricName(PUB_COMPONENT, "dropped_requests"));
-        buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
-        enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
-        queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
         importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
         itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
         importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
@@ -115,56 +89,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Runs provided code updating provided metric
-     * with its execution time.
-     * The method guarantees that the metric is updated
-     * even if the code throws an exception
-     * @param metric metric to update
-     * @param code code to clock
-     * @throws Exception actually it doesn't
-     */
-    public static void timed(Timer metric, Runnable code) throws Exception {
-        try (Timer.Context ignored = metric.time()) {
-            code.run();
-        }
-    }
-
-    /**
-     * Runs provided code updating provided metric
-     * with its execution time.
-     * The method guarantees that the metric is updated
-     * even if the code throws an exception
-     * @param metric metric to update
-     * @param code code to clock
-     * @return a value returned but <code>code.call()</code> invocation
-     * @throws Exception if underlying code throws
-     */
-    public static <T> T timed(Timer metric, Callable<T> code) throws Exception {
-        try (Timer.Context ignored = metric.time()) {
-            return code.call();
-        }
-    }
-
-    /**
-     * Counter of package removed during the Package Cleanup Task.
-     * The count is the sum of all packages removed since the service started.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Counter getCleanupPackageRemovedCount() {
-        return cleanupPackageRemovedCount;
-    }
-
-    /**
-     * Timer of the Package Cleanup Task execution duration.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getCleanupPackageDuration() {
-        return cleanupPackageDuration;
-    }
-
-    /**
      * Histogram of the imported content package size in Byte.
      *
      * @return a Sling Metrics histogram
@@ -174,33 +98,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Histogram of the exported content package size in Bytes.
-     *
-     * @return a Sling Metrics histogram
-     */
-    public Histogram getExportedPackageSize() {
-        return exportedPackageSize;
-    }
-
-    /**
-     * Meter of requests returning an {@code DistributionRequestState.ACCEPTED} state.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getAcceptedRequests() {
-        return acceptedRequests;
-    }
-
-    /**
-     * Meter of requests returning an {@code DistributionRequestState.DROPPED} state.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getDroppedRequests() {
-        return droppedRequests;
-    }
-
-    /**
      * Counter of the package buffer size on the subscriber.
      *
      * @return a Sling Metrics counter
@@ -273,33 +170,6 @@ public class DistributionMetricsService {
         return packageDistributedDuration;
     }
 
-    /**
-     * Timer capturing the duration in ms of building a content package
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getBuildPackageDuration() {
-        return buildPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of adding a package to the queue
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getEnqueuePackageDuration() {
-        return enqueuePackageDuration;
-    }
-
-    /**
-     * Counter of fetch operations to feed the queue cache.
-     *
-     * @return a Sling Metric counter
-     */
-    public Counter getQueueCacheFetchCount() {
-        return queueCacheFetchCount;
-    }
-    
     public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
         return new GaugeService<>(name, description, supplier);
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
index 858e8dc..51d5466 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
index bb49514..80c3747 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
similarity index 69%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index f18c04c..1f1b279 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.lang.String.format;
 
@@ -59,30 +59,12 @@ public class DistributionMetricsService {
 
     private Timer cleanupPackageDuration;
 
-    private Histogram importedPackageSize;
-
     private Histogram exportedPackageSize;
 
     private Meter acceptedRequests;
 
     private Meter droppedRequests;
 
-    private Counter itemsBufferSize;
-
-    private Timer removedPackageDuration;
-
-    private Timer removedFailedPackageDuration;
-
-    private Timer importedPackageDuration;
-
-    private Meter failedPackageImports;
-
-    private Timer sendStoredStatusDuration;
-
-    private Timer processQueueItemDuration;
-
-    private Timer packageDistributedDuration;
-
     private Timer buildPackageDuration;
 
     private Timer enqueuePackageDuration;
@@ -102,16 +84,6 @@ public class DistributionMetricsService {
         buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
         enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
         queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
-        importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
-        itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
-        importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
-        removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
-        removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
-        failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
-        sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
-        processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
-        packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
     }
 
     /**
@@ -165,15 +137,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Histogram of the imported content package size in Byte.
-     *
-     * @return a Sling Metrics histogram
-     */
-    public Histogram getImportedPackageSize() {
-        return importedPackageSize;
-    }
-
-    /**
      * Histogram of the exported content package size in Bytes.
      *
      * @return a Sling Metrics histogram
@@ -201,79 +164,6 @@ public class DistributionMetricsService {
     }
 
     /**
-     * Counter of the package buffer size on the subscriber.
-     *
-     * @return a Sling Metrics counter
-     */
-    public Counter getItemsBufferSize() {
-        return itemsBufferSize;
-    }
-
-    /**
-     * Timer capturing the duration in ms of successful packages import operations.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getImportedPackageDuration() {
-        return importedPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getRemovedPackageDuration() {
-        return removedPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
-     *
-     * @return a Sling Metrics timer
-     */
-    public Timer getRemovedFailedPackageDuration() {
-        return removedFailedPackageDuration;
-    }
-
-    /**
-     * Meter of failures to import packages.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getFailedPackageImports() {
-        return failedPackageImports;
-    }
-
-    /**
-     * Timer capturing the duration in ms of sending a stored package status.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getSendStoredStatusDuration() {
-        return sendStoredStatusDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of processing a queue item.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getProcessQueueItemDuration() {
-        return processQueueItemDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of distributing a distribution package.
-     * The timer starts when the package is enqueued and stops when the package is successfully imported.
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getPackageDistributedDuration() {
-        return packageDistributedDuration;
-    }
-
-    /**
      * Timer capturing the duration in ms of building a content package
      *
      * @return a Sling Metric timer
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
index 529fc52..ed2d461 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
 
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
index 28f5aa8..48727a7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
index e4f36c9..a1e3924 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
index 3582b36..6cb5170 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Objects.requireNonNull;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
index 287f3fa..9478372 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.util.Objects;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
index 2bb30b5..18cf976 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
index b9d1d2d..72eb177 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
similarity index 99%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
index 30f8836..8ac9ba5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import java.io.IOException;
 import java.io.PrintWriter;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
index 47a9f25..58a0717 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
index e49700e..eb0510c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
index 1f4d44a..c9ce5e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 571d98a..1484a9e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -19,10 +19,10 @@
 package org.apache.sling.distribution.journal.impl.precondition;
 
 import org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..b078605 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -36,9 +36,9 @@ import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 5e7b76c..2b82486 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -30,8 +30,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.UUID;
 
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +38,8 @@ import org.mockito.Mockito;
 
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index 2a41a2f..f52026c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,6 +41,7 @@ import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
@@ -53,8 +54,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
-
 @SuppressWarnings("rawtypes")
 public class DistPublisherJMXTest {
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7ab2bdf..434c561 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -38,9 +38,9 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import com.google.common.collect.ImmutableMap;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index acc91b0..ab5d17e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -21,9 +21,9 @@ package org.apache.sling.distribution.journal.impl.publisher;
 import java.util.Collections;
 import java.util.HashSet;
 
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index a8fb467..9b41463 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -30,8 +30,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.function.Function;
 
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -57,6 +55,8 @@ import org.mockito.Spy;
 import org.osgi.framework.BundleContext;
 
 import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index 6aeb1ba..e3b5582 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -31,13 +31,13 @@ import static org.hamcrest.Matchers.hasItemInArray;
 import static org.junit.Assert.assertThat;
 
 import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.MessageInfo;
 
 public class QueueItemFactoryTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
index 11a52f9..5aededc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
 
 import java.util.HashMap;
 
+import org.apache.sling.distribution.journal.shared.EntryUtil;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.junit.Test;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
index 18cc957..e7f9639 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
@@ -34,7 +34,7 @@ import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.junit.Test;
 
 public class OffsetQueueImplJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 1113319..5fca8a3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -57,10 +57,10 @@ import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 61c1389..166ad1a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,7 +38,6 @@ import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import com.google.protobuf.GeneratedMessage;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -58,6 +57,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 20dccea..211b9fc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -37,6 +37,7 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 778bbaa..3cef383 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -28,7 +28,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +40,7 @@ import org.mockito.MockitoAnnotations;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index 05f8a88..e669a82 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,7 +27,9 @@ import static org.mockito.Mockito.when;
 import java.util.Collections;
 import java.util.function.Consumer;
 
+import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
 import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index d6c774e..0de3d2e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -33,10 +33,10 @@ import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1d47520..e3dd57b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -48,11 +48,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -70,6 +65,20 @@ import org.apache.sling.distribution.SimpleDistributionRequest;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -97,16 +106,6 @@ import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.util.converter.Converters;
 
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.ByteString;
 
@@ -173,7 +172,7 @@ public class SubscriberTest {
     private MessageSender<PackageStatusMessage> statusSender;
 
     @Mock
-    private DistributionMetricsService distributionMetricsService;
+    private SubscriberMetrics subscriberMetrics;
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -384,23 +383,23 @@ public class SubscriberTest {
         Timer.Context timerContext = Mockito.mock(Timer.Context.class);
         when(timer.time())
             .thenReturn(timerContext);
-        when(distributionMetricsService.getImportedPackageSize())
+        when(subscriberMetrics.getImportedPackageSize())
                 .thenReturn(histogram);
-        when(distributionMetricsService.getItemsBufferSize())
+        when(subscriberMetrics.getItemsBufferSize())
                 .thenReturn(counter);
-        when(distributionMetricsService.getFailedPackageImports())
+        when(subscriberMetrics.getFailedPackageImports())
                 .thenReturn(meter);
-        when(distributionMetricsService.getRemovedFailedPackageDuration())
+        when(subscriberMetrics.getRemovedFailedPackageDuration())
                 .thenReturn(timer);
-        when(distributionMetricsService.getRemovedPackageDuration())
+        when(subscriberMetrics.getRemovedPackageDuration())
                 .thenReturn(timer);
-        when(distributionMetricsService.getImportedPackageDuration())
+        when(subscriberMetrics.getImportedPackageDuration())
                 .thenReturn(timer);
-        when(distributionMetricsService.getSendStoredStatusDuration())
+        when(subscriberMetrics.getSendStoredStatusDuration())
                 .thenReturn(timer);
-        when(distributionMetricsService.getProcessQueueItemDuration())
+        when(subscriberMetrics.getProcessQueueItemDuration())
                 .thenReturn(timer);
-        when(distributionMetricsService.getPackageDistributedDuration())
+        when(subscriberMetrics.getPackageDistributedDuration())
                 .thenReturn(timer);
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
similarity index 89%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
index fd4c761..6d88be9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -26,7 +26,6 @@ import java.util.function.Consumer;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
@@ -44,7 +43,7 @@ public class BookKeeperTest {
     private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
 
     @Mock
-    private DistributionMetricsService distributionMetricsService;
+    private SubscriberMetrics subscriberMetrics;
 
     @Mock
     private PackageHandler packageHandler;
@@ -59,7 +58,7 @@ public class BookKeeperTest {
 
     @Before
     public void before() {
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
+        bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, eventAdmin, sender,
                 "subAgentName", "subSlingId", true, 10);
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
index 0868cc8..16805a9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static java.util.Collections.singletonList;
 import static org.mockito.Mockito.verify;
@@ -37,6 +37,8 @@ import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.sling.MockSling;
 import org.apache.sling.testing.mock.sling.ResourceResolverType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
index 11b346a..1a03199 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertEquals;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.LocalStore;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Test;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
index 6c33412..4f2f0a4 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+
 public class PackageRetriesTest {
 
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubQueueTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubQueueTest.java
index d2784e9..90ef7b5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubQueueTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -26,6 +26,9 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.subscriber.SubQueue;
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+
 import com.google.common.collect.Lists;
 
 public class SubQueueTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
similarity index 94%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
index ef5f0d7..6c40034 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.osgi.framework.BundleContext;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
similarity index 85%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index 1049ef5..50d5507 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertNotNull;
@@ -32,7 +32,8 @@ import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -94,16 +95,7 @@ public class DistributionMetricsServiceTest {
         assertNotNull(metrics.getDroppedRequests());
         assertNotNull(metrics.getEnqueuePackageDuration());
         assertNotNull(metrics.getExportedPackageSize());
-        assertNotNull(metrics.getFailedPackageImports());
-        assertNotNull(metrics.getImportedPackageDuration());
-        assertNotNull(metrics.getImportedPackageSize());
-        assertNotNull(metrics.getItemsBufferSize());
-        assertNotNull(metrics.getPackageDistributedDuration());
-        assertNotNull(metrics.getProcessQueueItemDuration());
         assertNotNull(metrics.getQueueCacheFetchCount());
-        assertNotNull(metrics.getRemovedFailedPackageDuration());
-        assertNotNull(metrics.getRemovedPackageDuration());
-        assertNotNull(metrics.getSendStoredStatusDuration());
     }
     
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
index bd44057..8fe7ad9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.time.Duration.of;
 import static java.time.temporal.ChronoUnit.MILLIS;
@@ -27,6 +27,7 @@ import java.time.temporal.ChronoUnit;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.sling.distribution.journal.shared.ExponentialBackOff;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.runners.MockitoJUnitRunner;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
index ce7f279..0214b83 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
@@ -35,9 +35,12 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
-import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
index fed9fef..7fd0207 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.samePropertyValuesAs;
@@ -29,7 +29,6 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 
-import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,6 +41,7 @@ import org.mockito.MockitoAnnotations;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.LimitPoller;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
index 14dcc19..ede1b69 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -36,6 +36,7 @@ import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
index aa2947a..f483c86 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static java.util.Collections.emptyList;
 import static org.hamcrest.Matchers.containsString;
@@ -43,6 +43,9 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.shared.PackageViewerPlugin;
+import org.apache.sling.distribution.journal.shared.Topics;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
index 5d31bf6..7202b6b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
 import org.junit.Test;
 
 public class SimpleDistributionResponseTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
index a2105e3..9145dfe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
 
 import org.apache.sling.distribution.journal.MessageInfo;