You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/23 09:51:03 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: Sling 9583 - Make queue code usable for other bundles (#53)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 2efca59  Sling 9583 - Make queue code usable for other bundles (#53)
2efca59 is described below

commit 2efca59b3348175be9f9ff8c39c9300cab9d03e9
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Thu Jul 23 11:50:55 2020 +0200

    Sling 9583 - Make queue code usable for other bundles (#53)
    
    * SLING-9577 - Switch to seeding thread
    
    * SLING-9577 - Remove unnecessar parameters
    
    * SLING-9577 - Remove unnecessary logging
    
    * SLING-9577 - Add count so we can monitor numnber of seeding messages
    
    * SLING-9577 - Start counting with 1
    
    * SLING-9577 - Limit maximum number of messages sent on seeder thread
    
    * SLING-9577 - Use exponential backoff and unlimited messages on seeder thread
    
    * SLING-9583 - Delete SubQueue classes as they are not used anymore
    
    * SLING-9583 - Combine parameters into value object
    
    * SLING-9583 - Extract messaging code from PubQueueProviderImpl
    
    * SLING-9583 - Extract messaging code from PubQueueCache
    
    * SLING-9583 - Cleanup
    
    * SLING-9583 - Integrate PubQueueCacheService into PubQueueProviderImpl
    
    * SLING-9583 - checkstyle
    
    * SLING-9583 - ignore sonar warnings
    
    * SLING-9583 - ignore sonar warnings
    
    * SLING-9583 - Increase coverage
    
    * SLING-9583 - Adding test
    
    * SLING-9583 - Move discovery into separate package
    
    * SLING-9583 - Create PubQueueProvider using factory
    
    * SLING-9583 - Sonar
    
    * SLING-9583 - Remove QueueId
    
    * SLING-9583 - Move all queue logic to PubQueueProvider
    
    * SLING-9583 - Publish single PubQueueProvider service for compatibility
    
    * SLING-9583 - Fix null annotation
    
    * SLING-9583 - Fix null annotation
    
    * SLING-9583 - Make queue API independent from discovery
    
    * SLING-9583 - Improve readability of test
    
    * SLING-9583 - Move queue package out of impl as it is exported
    
    * SLING-9583 - Fix sonar issues
---
 pom.xml                                            |   2 +-
 .../{publisher => discovery}/DiscoveryService.java |  40 ++--
 .../impl/{publisher => discovery}/State.java       |   2 +-
 .../TopologyChangeHandler.java                     |   2 +-
 .../{publisher => discovery}/TopologyView.java     |   2 +-
 .../{publisher => discovery}/TopologyViewDiff.java |   2 +-
 .../TopologyViewManager.java                       |   4 +-
 .../journal/impl/publisher/DistPublisherJMX.java   |   2 +
 .../impl/publisher/DistributionPublisher.java      |  84 +++------
 .../impl/publisher/MessagingCacheCallback.java     | 122 ++++++++++++
 .../impl/publisher/PackageDistributedNotifier.java |  44 +++--
 .../impl/publisher/PubQueueProviderPublisher.java  |  88 +++++++++
 .../impl => publisher}/QueueCacheSeeder.java       |   2 +-
 .../{queue/impl => publisher}/RangePoller.java     |   8 +-
 .../impl/queue/impl/PubQueueCacheService.java      | 138 --------------
 .../impl/queue/impl/PubQueueProviderImpl.java      | 155 ----------------
 .../impl/queue/impl/QueueCacheCleanupTask.java     |  56 ------
 .../journal/impl/queue/impl/SubQueue.java          | 165 -----------------
 .../CacheCallback.java}                            |  18 +-
 .../{impl/queue/impl => queue}/ClearCallback.java  |   2 +-
 .../journal/{impl => }/queue/OffsetQueue.java      |   2 +-
 .../journal/{impl => }/queue/PubQueueProvider.java |  24 ++-
 .../PubQueueProviderFactory.java}                  |  18 +-
 .../journal/{impl => }/queue/QueueItemFactory.java |   2 +-
 .../distribution/journal/queue/QueueState.java     |  50 +++++
 .../journal/{impl => }/queue/impl/EntryUtil.java   |   4 +-
 .../{impl => }/queue/impl/OffsetQueueImpl.java     |   4 +-
 .../queue/impl/OffsetQueueImplMBean.java           |   2 +-
 .../journal/{impl => }/queue/impl/PubErrQueue.java |   4 +-
 .../journal/{impl => }/queue/impl/PubQueue.java    |   5 +-
 .../{impl => }/queue/impl/PubQueueCache.java       |  56 ++----
 .../queue/impl/PubQueueProviderFactoryImpl.java    |  46 +++++
 .../journal/queue/impl/PubQueueProviderImpl.java   | 204 +++++++++++++++++++++
 .../{impl => }/queue/impl/QueueEntryFactory.java   |   4 +-
 .../package-info.java}                             |  10 +-
 .../{impl/publisher => shared}/AgentId.java        |   2 +-
 .../DiscoveryServiceTest.java                      |   2 +-
 .../impl/{publisher => discovery}/StateTest.java   |   2 +-
 .../TopologyViewDiffTest.java                      |   2 +-
 .../TopologyViewManagerTest.java                   |   2 +-
 .../{publisher => discovery}/TopologyViewTest.java |   3 +-
 .../impl/publisher/DistPublisherJMXTest.java       |   3 +
 .../impl/publisher/DistributionPublisherTest.java  |  52 +++---
 .../impl/publisher/MessagingCacheCallbackTest.java | 184 +++++++++++++++++++
 .../publisher/PackageDistributedNotifierTest.java  |  54 ++++--
 .../publisher/PubQueueProviderPublisherTest.java   |  75 ++++++++
 .../impl => publisher}/QueueCacheSeederTest.java   |   2 +-
 .../{queue/impl => publisher}/RangePollerTest.java |   4 +-
 .../journal/impl/queue/impl/SubQueueTest.java      |  73 --------
 .../journal/impl/subscriber/SubscriberTest.java    |  61 +++---
 .../{impl => }/queue/QueueItemFactoryTest.java     |  10 +-
 .../{impl => }/queue/impl/EntryUtilTest.java       |   2 +-
 .../queue/impl/OffsetQueueImplJMXTest.java         |   4 +-
 .../{impl => }/queue/impl/OffsetQueueImplTest.java |   4 +-
 .../{impl => }/queue/impl/PubQueueCacheTest.java   | 119 ++++--------
 .../queue/impl/PubQueueProviderTest.java           |  90 +++++----
 .../{impl => }/queue/impl/PubQueueTest.java        |  12 +-
 .../{impl/publisher => shared}/AgentIdTest.java    |   2 +-
 58 files changed, 1138 insertions(+), 999 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1be3cbc..a433d58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,7 @@
         <dependency>
             <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.converter</artifactId>
-            <version>1.0.0</version>
+            <version>1.0.14</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
similarity index 79%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index a88770e..35a5024 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import static java.lang.String.format;
 import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
@@ -32,6 +32,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.SubscriberConfig;
 import org.apache.sling.distribution.journal.messages.SubscriberState;
+import org.apache.sling.distribution.journal.shared.AgentId;
 import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.commons.io.IOUtils;
@@ -41,13 +42,12 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
-
-import org.apache.sling.distribution.journal.MessageHandler;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicyOption;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.Reset;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,10 +79,10 @@ public class DiscoveryService implements Runnable {
     @Reference
     private Topics topics;
 
-    @Reference
-    private TopologyChangeHandler topologyChangeHandler;
+    @Reference(policyOption = ReferencePolicyOption.GREEDY, cardinality = ReferenceCardinality.OPTIONAL)
+    private volatile TopologyChangeHandler topologyChangeHandler; //NOSONAR
 
-    private volatile ServiceRegistration<?> reg;
+    private volatile ServiceRegistration<?> reg; //NOSONAR
 
     private final TopologyViewManager viewManager = new TopologyViewManager(REFRESH_TTL_MS);
 
@@ -105,7 +105,7 @@ public class DiscoveryService implements Runnable {
         poller = messagingProvider.createPoller(
                 topics.getDiscoveryTopic(), 
                 Reset.latest,
-                create(DiscoveryMessage.class, new DiscoveryMessageHandler())
+                create(DiscoveryMessage.class, this::handleDiscovery)
                 ); 
         startTopologyViewUpdaterTask(context);
         LOG.info("Discovery service started");
@@ -140,7 +140,10 @@ public class DiscoveryService implements Runnable {
             } else {
                 LOG.debug(msg);
             }
-            topologyChangeHandler.changed(diffView);
+            TopologyChangeHandler handler = topologyChangeHandler;
+            if (handler != null) {
+                handler.changed(diffView);
+            }
         }
     }
 
@@ -152,18 +155,13 @@ public class DiscoveryService implements Runnable {
         reg = context.registerService(Runnable.class.getName(), this, props);
     }
 
-    private final class DiscoveryMessageHandler implements MessageHandler<DiscoveryMessage> {
-
-        @Override
-        public void handle(MessageInfo info, DiscoveryMessage disMsg) {
-
-            long now = System.currentTimeMillis();
-            AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName());
-            for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) {
-                SubscriberConfig subConfig = disMsg.getSubscriberConfiguration();
-                State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable());
-                viewManager.refreshState(subState);
-            }
+    public void handleDiscovery(MessageInfo info, DiscoveryMessage disMsg) {
+        long now = System.currentTimeMillis();
+        AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName());
+        for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) {
+            SubscriberConfig subConfig = disMsg.getSubscriberConfiguration();
+            State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable());
+            viewManager.refreshState(subState);
         }
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/State.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/State.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/State.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/State.java
index be649c4..bf007c6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/State.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/State.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import java.util.Objects;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyChangeHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyChangeHandler.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyChangeHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyChangeHandler.java
index aeb70c7..0a82791 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyChangeHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyChangeHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyView.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyView.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyView.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyView.java
index bdf4d34..ba7718d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyView.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyView.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import static java.util.Collections.unmodifiableSet;
 import static java.util.stream.Collectors.groupingBy;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiff.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiff.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiff.java
index 235982c..1647795 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiff.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import java.util.HashSet;
 import java.util.Map;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManager.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManager.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManager.java
index 04fd25a..cc7ac77 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManager.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManager.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import java.util.Map;
 import java.util.Set;
@@ -33,7 +33,7 @@ public class TopologyViewManager {
      */
     private final Map<String, State> states = new ConcurrentHashMap<>();
 
-    private volatile TopologyView currentView = new TopologyView();
+    private volatile TopologyView currentView = new TopologyView(); //NOSONAR
 
     private final long refreshTtl;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
index 9440ffe..48e69a5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
@@ -33,6 +33,8 @@ import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 6b9e77a..d14ad66 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -19,7 +19,6 @@
 package org.apache.sling.distribution.journal.impl.publisher;
 
 
-import static java.util.stream.StreamSupport.stream;
 import static java.util.Objects.requireNonNull;
 import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
 import static org.apache.sling.distribution.DistributionRequestType.ADD;
@@ -27,34 +26,33 @@ import static org.apache.sling.distribution.DistributionRequestType.DELETE;
 import static org.apache.sling.distribution.DistributionRequestType.TEST;
 import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
 
+import java.io.Closeable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 import javax.management.NotCompliantMBeanException;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.shared.AgentState;
 import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
@@ -75,7 +73,10 @@ import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
+
 import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
 /**
@@ -106,9 +107,6 @@ public class DistributionPublisher implements DistributionAgent {
     private PackageQueuedNotifier queuedNotifier;
 
     @Reference
-    private PubQueueProvider pubQueueProvider;
-
-    @Reference
     private DiscoveryService discoveryService;
 
     @Reference
@@ -126,6 +124,9 @@ public class DistributionPublisher implements DistributionAgent {
     @Reference
     private DistributionMetricsService distributionMetricsService;
 
+    @Reference
+    private PubQueueProvider pubQueueProvider;
+
     private String pubAgentName;
 
     private String pkgType;
@@ -140,6 +141,8 @@ public class DistributionPublisher implements DistributionAgent {
 
     private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
 
+    private Closeable statusPoller;
+
     public DistributionPublisher() {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
         REQ_TYPES.put(ADD,    this::sendAndWait);
@@ -177,11 +180,19 @@ public class DistributionPublisher implements DistributionAgent {
                 "Current number of publish subscribers",
                 () -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
         );
+        
+        statusPoller = messagingProvider.createPoller(
+                topics.getStatusTopic(),
+                Reset.earliest,
+                HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus)
+                );
+        
         log.info(msg);
     }
 
     @Deactivate
     public void deactivate() {
+        IOUtils.closeQuietly(statusPoller, pubQueueProvider);
         reg.close();
         componentReg.unregister();
         String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
@@ -206,41 +217,17 @@ public class DistributionPublisher implements DistributionAgent {
     @Nonnull
     @Override
     public Iterable<String> getQueueNames() {
-
-        // Queues names are generated only for the subscriber agents which are
-        // alive and are subscribed to the publisher agent name (pubAgentName).
-        // The queue names match the subscriber agent identifier (subAgentId).
-        //
-        // If errors queues are enabled, an error queue name is generated which
-        // follows the pattern "%s-error". The pattern is deliberately different
-        // from the SCD on Jobs one ("error-%s") as we don't want to support
-        // the UI ability to retry items from the error queue.
-        Set<String> queueNames = new HashSet<>();
-        TopologyView view =  discoveryService.getTopologyView();
-        for (String subAgentId : view.getSubscribedAgentIds(pubAgentName)) {
-            queueNames.add(subAgentId);
-            State subState = view.getState(subAgentId, pubAgentName);
-            if (subState != null) {
-                boolean errorQueueEnabled = (subState.getMaxRetries() >= 0);
-                if (errorQueueEnabled) {
-                    queueNames.add(String.format("%s-error", subAgentId));
-                }
-            }
-        }
-        return Collections.unmodifiableCollection(queueNames);
+        return Collections.unmodifiableCollection(pubQueueProvider.getQueueNames(pubAgentName));
     }
 
     @Override
     public DistributionQueue getQueue(String queueName) {
-
-        // validate that queueName is a valid name returned by #getQueueNames
-        if (stream(getQueueNames().spliterator(), true).noneMatch(queueName::equals)) {
-            distributionMetricsService.getQueueAccessErrorCount().increment();
-            return null;
-        }
-
         try {
-            return queueName.endsWith("-error") ? getErrorQueue(queueName) : getPubQueue(queueName);
+            DistributionQueue queue = pubQueueProvider.getQueue(pubAgentName, queueName);
+            if (queue == null) {
+                distributionMetricsService.getQueueAccessErrorCount().increment();
+            }
+            return queue;
         } catch (Exception e) {
             distributionMetricsService.getQueueAccessErrorCount().increment();
             throw e;
@@ -248,25 +235,6 @@ public class DistributionPublisher implements DistributionAgent {
     }
 
     @Nonnull
-    private DistributionQueue getErrorQueue(String queueName) {
-        AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast(queueName, "-error"));
-        return pubQueueProvider.getErrorQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
-    }
-
-    @CheckForNull
-    private DistributionQueue getPubQueue(String queueName) {
-        TopologyView view = discoveryService.getTopologyView();
-        AgentId subAgentId = new AgentId(queueName);
-        State state = view.getState(subAgentId.getAgentId(), pubAgentName);
-        if (state != null) {
-            return pubQueueProvider.getQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName, state.getOffset() + 1, state.getRetries(), state.isEditable());
-        }
-        return null;
-    }
-
-
-
-    @Nonnull
     @Override
     public DistributionLog getLog() {
         return log;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
new file mode 100644
index 0000000..5bd9e87
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.ClearCallback;
+import org.apache.sling.distribution.journal.queue.QueueState;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessagingCacheCallback implements CacheCallback {
+    private Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private final MessagingProvider messagingProvider;
+
+    private final String packageTopic;
+
+    private final DistributionMetricsService distributionMetricsService;
+
+    private final DiscoveryService discoveryService;
+
+    private final Consumer<ClearCommand> commandSender;
+
+    public MessagingCacheCallback(
+            MessagingProvider messagingProvider, 
+            String packageTopic, 
+            DistributionMetricsService distributionMetricsService,
+            DiscoveryService discoveryService,
+            Consumer<ClearCommand> commandSender) {
+        this.messagingProvider = messagingProvider;
+        this.packageTopic = packageTopic;
+        this.distributionMetricsService = distributionMetricsService;
+        this.discoveryService = discoveryService;
+        this.commandSender = commandSender;
+    }
+
+    @Override
+    public Closeable createConsumer(MessageHandler<PackageMessage> handler) {
+        log.info("Starting consumer");
+        QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider.createSender(packageTopic)); //NOSONAR
+        Closeable poller = messagingProvider.createPoller( //NOSONAR
+                packageTopic,
+                Reset.latest,
+                create(PackageMessage.class, (info, message) -> { seeder.close(); handler.handle(info, message); }) 
+                ); 
+        seeder.startSeeding();
+        return () -> IOUtils.closeQuietly(seeder, poller);
+    }
+    
+    @Override
+    public List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException {
+        distributionMetricsService.getQueueCacheFetchCount().increment();
+        return new RangePoller(messagingProvider, packageTopic, minOffset, maxOffset)
+                .fetchRange();
+    }
+
+    @Override
+    public QueueState getQueueState(String pubAgentName, String subAgentId) {
+        TopologyView view = discoveryService.getTopologyView();
+        State state = view.getState(subAgentId, pubAgentName);
+        if (state == null) {
+            return null;
+        }
+        ClearCallback editableCallback = offset -> sendClearCommand(pubAgentName, new AgentId(subAgentId), offset);
+        ClearCallback clearCallback = state.isEditable() ? editableCallback : null;
+        long curOffset = state.getOffset();
+        int headRetries = state.getRetries();
+        int maxRetries = state.getMaxRetries();
+        return new QueueState(curOffset, headRetries, maxRetries, clearCallback);
+    }
+    
+    private void sendClearCommand(String pubAgentName, AgentId subAgentId, long offset) {
+        ClearCommand commandMessage = ClearCommand.builder()
+                .pubAgentName(pubAgentName)
+                .subSlingId(subAgentId.getSlingId())
+                .subAgentName(subAgentId.getAgentName())
+                .offset(offset)
+                .build();
+        log.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subAgentId.getSlingId(), subAgentId.getAgentName(), offset);
+        commandSender.accept(commandMessage);
+    }
+
+    @Override
+    public Set<String> getSubscribedAgentIds(String pubAgentName) {
+        return discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName);
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index ea3adc5..d5dce4e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -26,11 +26,13 @@ import java.util.stream.LongStream;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
 import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 
@@ -57,7 +59,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
     private EventAdmin eventAdmin;
 
     @Reference
-    private PubQueueCacheService pubQueueCacheService;
+    private PubQueueProvider pubQueueCacheService;
 
     @Reference
     private MessagingProvider messagingProvider;
@@ -91,28 +93,32 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
         long minOffset = offsets.get().findFirst().getAsLong();
         OffsetQueue<DistributionQueueItem> offsetQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
         offsets
-        	.get()
-        	.mapToObj(offsetQueue::getItem)
-        	.filter(Objects::nonNull)
-        	.forEach(msg -> processOffset(pubAgentName, msg));
+            .get()
+            .mapToObj(offsetQueue::getItem)
+            .filter(Objects::nonNull)
+            .forEach(msg -> processOffset(pubAgentName, msg));
     }
 
     protected void processOffset(String pubAgentName, DistributionQueueItem queueItem) {
         sendEvt(pubAgentName, queueItem);
-        sendMsg(pubAgentName, queueItem);
+        if (sendMsg) {
+            sendMsg(pubAgentName, queueItem);
+        }
     }
 
     private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) {
-        if (sendMsg) {
-            PackageDistributedMessage msg = new PackageDistributedMessage();
-            msg.pubAgentName = pubAgentName;
-            msg.packageId = queueItem.getPackageId();
-            msg.offset = (Long) queueItem.get(QueueItemFactory.RECORD_OFFSET);
-            msg.paths = (String[]) queueItem.get(PROPERTY_REQUEST_PATHS);
-            msg.deepPaths = (String[]) queueItem.get(PROPERTY_REQUEST_DEEP_PATHS);
-
-            sender.accept(msg);
-        }
+        PackageDistributedMessage msg = createDistributedMessage(pubAgentName, queueItem);
+        sender.accept(msg);
+    }
+
+    private PackageDistributedMessage createDistributedMessage(String pubAgentName, DistributionQueueItem queueItem) {
+        return PackageDistributedMessage.builder()
+            .pubAgentName(pubAgentName)
+            .packageId(queueItem.getPackageId())
+            .offset((Long) queueItem.get(QueueItemFactory.RECORD_OFFSET))
+            .paths((String[]) queueItem.get(PROPERTY_REQUEST_PATHS))
+            .deepPaths((String[]) queueItem.get(PROPERTY_REQUEST_DEEP_PATHS))
+            .build();
     }
 
     private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
new file mode 100644
index 0000000..dd390d2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import java.util.Hashtable;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+
+/**
+ * Normally PubQueueProvider should be created per publish agent.
+ * For compatibility with current code and to save on number of consumers 
+ * we must make sure to publish only one for the messaging based impl.
+ */
+@Component
+public class PubQueueProviderPublisher {
+    @Reference
+    private MessagingProvider messagingProvider;
+
+    @Reference
+    private DiscoveryService discoveryService;
+
+    @Reference
+    private Topics topics;
+    
+    @Reference
+    JournalAvailable journalAvailable;
+
+    @Reference
+    private DistributionMetricsService distributionMetricsService;
+
+    @Reference
+    private PubQueueProviderFactory pubQueueProviderFactory;
+
+    private PubQueueProvider pubQueueProvider;
+
+    private ServiceRegistration<PubQueueProvider> reg;
+
+    @Activate
+    public void activate(BundleContext context) {
+        Consumer<ClearCommand> commandSender = messagingProvider.createSender(topics.getCommandTopic());
+        CacheCallback callback = new MessagingCacheCallback(
+                messagingProvider, 
+                topics.getPackageTopic(), 
+                distributionMetricsService,
+                discoveryService,
+                commandSender);
+        this.pubQueueProvider = pubQueueProviderFactory.create(callback);
+        reg = context.registerService(PubQueueProvider.class, this.pubQueueProvider, new Hashtable<>());
+    }
+    
+    @Deactivate
+    public void deactivate() {
+        IOUtils.closeQuietly(this.pubQueueProvider);
+        reg.unregister();
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
index 26a6eaa..bbf27b6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
index 72e7c4a..040db85 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
@@ -55,12 +55,12 @@ public class RangePoller {
     public RangePoller(MessagingProvider messagingProvider,
                           String packageTopic,
                           long minOffset,
-                          long maxOffset) {
-        this.maxOffset = maxOffset;
+                          long maxOffsetExclusive) {
+        this.maxOffset = maxOffsetExclusive;
         this.minOffset = minOffset;
         this.messages = new ArrayList<>();
         String assign = messagingProvider.assignTo(minOffset);
-        LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset);
+        LOG.info("Fetching offsets [{},{}[", minOffset, maxOffsetExclusive);
         headPoller = messagingProvider.createPoller(
                 packageTopic, Reset.earliest, assign,
                 create(PackageMessage.class, this::handlePackage)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
deleted file mode 100644
index b8d6692..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
-import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(immediate = true, service = PubQueueCacheService.class)
-@ParametersAreNonnullByDefault
-public class PubQueueCacheService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PubQueueCacheService.class);
-
-    /**
-     * The minimum size to collect the cache. Each cache entry requires
-     * around 500B of heap space. 10'000 entries ~= 5MB on heap.
-     */
-    private static final int CLEANUP_THRESHOLD = 10_000;
-
-    /**
-     * Will cause the cache to be cleared when we loose the journal
-     */
-    @Reference
-    private JournalAvailable journalAvailable;
-
-    /**
-     * The cache is active only when at least one DistributionSubscriber agent is configured.
-     */
-    @Reference
-    private PublisherConfigurationAvailable publisherConfigurationAvailable;
-
-    @Reference
-    private MessagingProvider messagingProvider;
-
-    @Reference
-    private Topics topics;
-
-    @Reference
-    private EventAdmin eventAdmin;
-
-    @Reference
-    private DistributionMetricsService distributionMetricsService;
-
-    private volatile PubQueueCache cache;
-
-    public PubQueueCacheService() {}
-
-    public PubQueueCacheService(MessagingProvider messagingProvider,
-                                Topics topics,
-                                EventAdmin eventAdmin) {
-        this.messagingProvider = messagingProvider;
-        this.topics = topics;
-        this.eventAdmin = eventAdmin;
-    }
-
-    @Activate
-    public void activate() {
-        cache = newCache();
-        LOG.info("Started Publisher queue cache service");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        PubQueueCache queueCache = this.cache;
-        if (queueCache != null) {
-            queueCache.close();
-        }
-        LOG.info("Stopped Publisher queue cache service");
-    }
-
-    @Nonnull
-    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
-        try {
-            return cache.getOffsetQueue(pubAgentName, minOffset);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * The cleanup renew the cache when
-     * a capacity threshold has been reached.
-     */
-    public void cleanup() {
-        PubQueueCache queueCache = this.cache;
-        if (queueCache != null) {
-            int size = queueCache.size();
-            if (size > CLEANUP_THRESHOLD) {
-                LOG.info("Cleanup package cache (size={}/{})", size, CLEANUP_THRESHOLD);
-                queueCache.close();
-                cache = newCache();
-            } else {
-                LOG.info("No cleanup required for package cache (size={}/{})", size, CLEANUP_THRESHOLD);
-            }
-        }
-    }
-
-    private PubQueueCache newCache() {
-        String topic = topics.getPackageTopic();
-        MessageSender<PackageMessage> sender = messagingProvider.createSender(topic);
-        QueueCacheSeeder seeder = new QueueCacheSeeder(sender);
-        return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seeder);
-    }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
deleted file mode 100644
index c019c09..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-
-import java.io.Closeable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-
-import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.messages.ClearCommand;
-import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * As reading all messages is an expensive operation this component is activated lazily only when requested by the Publisher.
- */
-@Component
-@ParametersAreNonnullByDefault
-public class PubQueueProviderImpl implements PubQueueProvider {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
-
-    /*
-     * (pubAgentName#subAgentId x OffsetQueue)
-     */
-    private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<>();
-
-    @Reference
-    private MessagingProvider messagingProvider;
-
-    @Reference
-    private Topics topics;
-
-    @Reference
-    private PubQueueCacheService pubQueueCacheService;
-
-    private Closeable statusPoller;
-
-    private Consumer<ClearCommand> sender;
-
-    public PubQueueProviderImpl() {
-    }
-    
-    public PubQueueProviderImpl(
-            PubQueueCacheService pubQueueCacheService,
-            MessagingProvider messagingProvider,
-            Topics topics) {
-        this.pubQueueCacheService = pubQueueCacheService;
-        this.messagingProvider = messagingProvider;
-        this.topics = topics;
-    }
-
-    @Activate
-    public void activate() {
-        statusPoller = messagingProvider.createPoller(
-                topics.getStatusTopic(),
-                Reset.earliest,
-                create(PackageStatusMessage.class, this::handleStatus)
-                );
-        sender = messagingProvider.createSender(topics.getCommandTopic());
-        LOG.info("Started Publisher queue provider service");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        IOUtils.closeQuietly(statusPoller);
-        LOG.info("Stopped Publisher queue provider service");
-    }
-
-    @Nonnull
-    @Override
-    public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
-        OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
-        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset);
-        ClearCallback callback = editable ? editableCallback : null;
-        return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
-    }
-
-    @Nonnull
-    @Override
-    public DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName) {
-        String errorQueueKey = errorQueueKey(pubAgentName, subSlingId, subAgentName);
-        OffsetQueue<Long> errorQueue = errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl<>());
-        long headOffset = errorQueue.getHeadOffset();
-        final OffsetQueue<DistributionQueueItem> agentQueue;
-        if (headOffset < 0) {
-            agentQueue = new OffsetQueueImpl<>();
-        } else {
-            long minReferencedOffset = errorQueue.getItem(headOffset);
-            agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minReferencedOffset);
-        }
-
-        return new PubErrQueue(queueName, agentQueue, errorQueue);
-    }
-
-    public void handleStatus(MessageInfo info, PackageStatusMessage message) {
-        if (message.getStatus() == Status.REMOVED_FAILED) {
-            String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
-            OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
-            errorQueue.putItem(info.getOffset(), message.getOffset());
-        }
-    }
-
-    @Nonnull
-    private String errorQueueKey(String pubAgentName, String subSlingId, String subAgentName) {
-        return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
-    }
-
-    private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
-        ClearCommand commandMessage = ClearCommand.builder()
-                .subSlingId(subSlingId)
-                .subAgentName(subAgentName)
-                .offset(offset)
-                .build();
-        LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
-        sender.accept(commandMessage);
-    }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
deleted file mode 100644
index b2bd673..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-
-/**
- * Periodical task to cleanup the resources
- * used by the cache.
- */
-@Component(
-        service = Runnable.class,
-        property = {
-                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
-                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 12 * 60 * 60 // 12 hours
-        })
-@ParametersAreNonnullByDefault
-public class QueueCacheCleanupTask implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(QueueCacheCleanupTask.class);
-
-    @Reference
-    private PubQueueCacheService queueCacheService;
-
-    @Override
-    public void run() {
-        LOG.info("Starting package cache cleanup task");
-        queueCacheService.cleanup();
-        LOG.info("Stopping package cache cleanup task");
-    }
-}
-
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
deleted file mode 100644
index 44dd6f8..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.distribution.journal.shared.PackageRetries;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.DistributionQueueStatus;
-import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.BLOCKED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.IDLE;
-import static org.apache.sling.distribution.queue.DistributionQueueState.RUNNING;
-import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
-
-@ParametersAreNonnullByDefault
-public class SubQueue implements DistributionQueue {
-
-    private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
-
-    @SuppressWarnings("unused")
-    private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
-
-    private final DistributionQueueItem headItem;
-
-    private final PackageRetries packageRetries;
-
-    private final String queueName;
-
-	private final  QueueEntryFactory entryFactory;
-
-    public SubQueue(String queueName,
-                    @Nullable
-                    DistributionQueueItem headItem,
-                    PackageRetries packageRetries) {
-        this.headItem = headItem;
-        this.queueName = Objects.requireNonNull(queueName);
-        this.packageRetries = Objects.requireNonNull(packageRetries);
-        this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
-    }
-
-    @Nonnull
-    @Override
-    public String getName() {
-        return queueName;
-    }
-
-    @Override
-    public DistributionQueueEntry add(DistributionQueueItem queueItem) {
-        throw new UnsupportedOperationException("Unsupported add operation");
-    }
-
-    @Override
-    @CheckForNull
-    public DistributionQueueEntry getHead() {
-        return entryFactory.create(headItem);
-    }
-
-    @Nonnull
-    @Override
-    public Iterable<DistributionQueueEntry> getEntries(int skip, int limit) {
-        final List<DistributionQueueEntry> entries;
-        if (skip == 0 && (limit == -1 || limit > 0) && headItem != null) {
-            entries = Collections.singletonList(entryFactory.create(headItem));
-        } else {
-            entries = Collections.emptyList();
-        }
-        return Collections.unmodifiableList(entries);
-    }
-
-    @Override
-    public DistributionQueueEntry getEntry(String entryId) {
-        return (entryId.equals(EntryUtil.entryId(headItem)))
-                ? entryFactory.create(headItem)
-                : null;
-    }
-
-    @Override
-    public DistributionQueueEntry remove(String entryId) {
-        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
-    }
-
-    @Nonnull
-    @Override
-    public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
-        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
-    }
-
-    @Nonnull
-    @Override
-    public Iterable<DistributionQueueEntry> clear(int limit) {
-        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
-    }
-
-    @Nonnull
-    @Override
-    public DistributionQueueStatus getStatus() {
-        final DistributionQueueState queueState;
-        final int itemsCount;
-        DistributionQueueEntry headEntry = getHead();
-        if (headEntry != null) {
-            itemsCount = 1;
-            DistributionQueueItemState itemState = headEntry.getStatus().getItemState();
-            if (itemState == QUEUED) {
-                queueState = RUNNING;
-            } else {
-                queueState = BLOCKED;
-            }
-        } else {
-            itemsCount = 0;
-            queueState = IDLE;
-        }
-
-        return new DistributionQueueStatus(itemsCount, queueState);
-    }
-
-    @Override
-    @Nonnull
-    public DistributionQueueType getType() {
-        return ORDERED;
-    }
-
-    @Override
-    public boolean hasCapability(String capability) {
-        return false;
-    }
-
-    private int attempts(DistributionQueueItem queueItem) {
-        String entryId = EntryUtil.entryId(queueItem);
-        return packageRetries.get(entryId);
-    }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
similarity index 55%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
copy to src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
index 0396b6e..c42306f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
@@ -16,13 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue;
 
-import javax.annotation.ParametersAreNonnullByDefault;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Set;
 
-@ParametersAreNonnullByDefault
-public interface ClearCallback {
-
-    void clear(long offset);
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 
+public interface CacheCallback {
+    Closeable createConsumer(MessageHandler<PackageMessage> handler);
+    List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException;
+    QueueState getQueueState(String pubAgentName, String subAgentId);
+    Set<String> getSubscribedAgentIds(String pubAgentName);
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/ClearCallback.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/ClearCallback.java
index 0396b6e..ccdf76a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/ClearCallback.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/OffsetQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/OffsetQueue.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/OffsetQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/OffsetQueue.java
index 7ed776f..c7ecdae 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/OffsetQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/OffsetQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
similarity index 56%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
copy to src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
index 178f6f2..6038afb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
@@ -16,20 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
+
+import java.io.Closeable;
+import java.util.Set;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
 @ParametersAreNonnullByDefault
-public interface PubQueueProvider {
+public interface PubQueueProvider extends Closeable {
 
-    @Nonnull
-    DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable);
+    @Nullable
+    DistributionQueue getQueue(String pubAgentName, String queueName);
 
     @Nonnull
-    DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName);
+    OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset);
+
+    void handleStatus(MessageInfo info, PackageStatusMessage message);
+
+    /**
+     * Get queue names for alive subscribed subscriber agents.
+     */
+    Set<String> getQueueNames(String pubAgentName);
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProviderFactory.java
similarity index 57%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProviderFactory.java
index 178f6f2..7f8e913 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProviderFactory.java
@@ -16,20 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
 
-import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
+public interface PubQueueProviderFactory {
 
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
+    PubQueueProvider create(CacheCallback callback);
 
-@ParametersAreNonnullByDefault
-public interface PubQueueProvider {
-
-    @Nonnull
-    DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable);
-
-    @Nonnull
-    DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName);
-
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
index 855a757..a026efb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
 
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_PACKAGE_TYPE;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/QueueState.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueueState.java
new file mode 100644
index 0000000..eae9d0d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueueState.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue;
+
+public class QueueState {
+    
+    private final long lastProcessedOffset;
+    private final int headRetries;
+    private final int maxRetries;
+    private final ClearCallback clearCallback;
+
+    public QueueState(long lastProcessedOffset, int headRetries, int maxRetries, ClearCallback clearCallback) {
+        this.lastProcessedOffset = lastProcessedOffset;
+        this.headRetries = headRetries;
+        this.maxRetries = maxRetries;
+        this.clearCallback = clearCallback;
+    }
+    
+    public long getLastProcessedOffset() {
+        return lastProcessedOffset;
+    }
+    
+    public int getHeadRetries() {
+        return headRetries;
+    }
+    
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public ClearCallback getClearCallback() {
+        return clearCallback;
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/EntryUtil.java
similarity index 91%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/EntryUtil.java
index 529fc52..9b9a84b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/EntryUtil.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 
 public final class EntryUtil {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImpl.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImpl.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImpl.java
index 941cf66..babdfe0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImpl.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -27,7 +27,7 @@ import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
 
 @ParametersAreNonnullByDefault
 public class OffsetQueueImpl<T> implements OffsetQueue<T>, OffsetQueueImplMBean {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplMBean.java
similarity index 93%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
copy to src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplMBean.java
index e4ef634..9c320a7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplMBean.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 public interface OffsetQueueImplMBean {
     long getHeadOffset();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
index aaa7b1c..69f1f59 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -25,7 +25,7 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
index 23aaf15..3ca97fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import static java.util.Collections.emptyList;
 import static java.util.stream.StreamSupport.stream;
@@ -39,7 +39,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.ClearCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
index fdd6ff7..6022f43 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.groupingBy;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
 import java.util.HashSet;
@@ -40,20 +39,17 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 
 /**
  * Cache the distribution packages fetched from the package topic.
@@ -71,7 +67,7 @@ public class PubQueueCache {
 
     private static final Logger LOG = LoggerFactory.getLogger(PubQueueCache.class);
 
-    private static final long MAX_FETCH_WAIT_MS = 60 * 1000; // 1 minute
+    private static final long MAX_FETCH_WAIT_MS = 60 * 1000l; // 1 minute
 
     /**
      * (pubAgentName x OffsetQueue)
@@ -102,35 +98,16 @@ public class PubQueueCache {
 
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
-    private final MessagingProvider messagingProvider;
-
     private final EventAdmin eventAdmin;
 
-    private volatile Closeable tailPoller;
-
-    private final QueueCacheSeeder seeder;
+    private volatile Closeable tailPoller; //NOSONAR
 
-    private final String topic;
-
-    private final DistributionMetricsService distributionMetricsService;
+    private final CacheCallback callback;
     
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, QueueCacheSeeder seeder) {
-        this.messagingProvider = messagingProvider;
+    public PubQueueCache(EventAdmin eventAdmin, CacheCallback callback) {
         this.eventAdmin = eventAdmin;
-        this.distributionMetricsService = distributionMetricsService;
-        this.topic = topic;
-        this.seeder = seeder;
-        startPoller();
-        this.seeder.startSeeding();
-    }
-
-    private void startPoller() {
-        LOG.info("Starting consumer");
-        tailPoller = messagingProvider.createPoller(
-                this.topic,
-                Reset.latest,
-                create(PackageMessage.class, this::handlePackage) 
-                );
+        this.callback = callback;
+        tailPoller = callback.createConsumer(this::handlePackage);
     }
 
     @Nonnull
@@ -148,7 +125,6 @@ public class PubQueueCache {
 
     public void close() {
         IOUtils.closeQuietly(tailPoller);
-        IOUtils.closeQuietly(seeder);
         jmxRegs.forEach(IOUtils::closeQuietly);
     }
 
@@ -201,12 +177,8 @@ public class PubQueueCache {
      * cache.
      */
     private void fetch(long requestedMinOffset, long cachedMinOffset) throws InterruptedException {
-        distributionMetricsService.getQueueCacheFetchCount().increment();
-        RangePoller headPoller = new RangePoller(messagingProvider,
-                topic,
-                requestedMinOffset,
-                cachedMinOffset);
-        merge(headPoller.fetchRange());
+        List<FullMessage<PackageMessage>> messages = callback.fetchRange(requestedMinOffset, cachedMinOffset);
+        merge(messages);
         updateMinOffset(requestedMinOffset);
     }
 
@@ -229,7 +201,6 @@ public class PubQueueCache {
     }
 
     private void merge(List<FullMessage<PackageMessage>> messages) {
-        LOG.debug("Merging fetched offsets");
         messages.stream()
             .filter(this::isNotTestMessage)
             .collect(groupingBy(message -> message.getMessage().getPubAgentName()))
@@ -251,6 +222,8 @@ public class PubQueueCache {
     }
 
     private void sendQueuedEvent(FullMessage<PackageMessage> fMessage) {
+        long offset = fMessage.getInfo().getOffset();
+        LOG.info("Queueing message package-id={}, offset={}", fMessage.getMessage().getPkgId(), offset);
         PackageMessage message = fMessage.getMessage();
         final Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
         eventAdmin.postEvent(queuedEvent);
@@ -274,7 +247,6 @@ public class PubQueueCache {
     }
 
     private void handlePackage(final MessageInfo info, final PackageMessage message) {
-        seeder.close();
         merge(singletonList(new FullMessage<>(info, message)));
         updateMaxOffset(info.getOffset());
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
new file mode 100644
index 0000000..2549d65
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+
+@Component
+public class PubQueueProviderFactoryImpl implements PubQueueProviderFactory {
+    
+    @Reference
+    private EventAdmin eventAdmin;
+    
+    private BundleContext context;
+
+    public void activate(BundleContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public PubQueueProvider create(CacheCallback callback) {
+        return new PubQueueProviderImpl(eventAdmin, callback, context);
+    }
+    
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
new file mode 100644
index 0000000..154ab64
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.QueueState;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ParametersAreNonnullByDefault
+public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
+    /**
+     * The minimum size to collect the cache. Each cache entry requires
+     * around 500B of heap space. 10'000 entries ~= 5MB on heap.
+     */
+    private static final int CLEANUP_THRESHOLD = 10_000;
+
+    private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
+    
+    private final EventAdmin eventAdmin;
+
+    private final CacheCallback callback;
+    
+    private volatile PubQueueCache cache; //NOSONAR
+
+    /*
+     * (pubAgentName#subAgentId x OffsetQueue)
+     */
+    private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<>();
+
+    private ServiceRegistration<?> reg;
+
+    public PubQueueProviderImpl(EventAdmin eventAdmin, CacheCallback callback, BundleContext context) {
+        this.eventAdmin = eventAdmin;
+        this.callback = callback;
+        cache = newCache();
+        startCleanupTask(context);
+        LOG.info("Started Publisher queue provider service");
+    }
+    
+    private void startCleanupTask(BundleContext context) {
+        // Register periodic task to update the topology view
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put(PROPERTY_SCHEDULER_CONCURRENT, false);
+        props.put(PROPERTY_SCHEDULER_PERIOD, 12*60*60L); // every 12 h
+        reg = context.registerService(Runnable.class.getName(), this, props);
+    }
+
+    @Override
+    public void close() {
+        PubQueueCache queueCache = this.cache;
+        if (queueCache != null) {
+            queueCache.close();
+        }
+        if (reg != null) {
+            reg.unregister();
+        }
+        LOG.info("Stopped Publisher queue provider service");
+    }
+    
+    @Override
+    public void run() {
+        LOG.info("Starting package cache cleanup task");
+        PubQueueCache queueCache = this.cache;
+        if (queueCache != null) {
+            int size = queueCache.size();
+            if (size > CLEANUP_THRESHOLD) {
+                LOG.info("Cleanup package cache (size={}/{})", size, CLEANUP_THRESHOLD);
+                queueCache.close();
+                cache = newCache();
+            } else {
+                LOG.info("No cleanup required for package cache (size={}/{})", size, CLEANUP_THRESHOLD);
+            }
+        }
+        LOG.info("Stopping package cache cleanup task");
+    }
+    
+    @Nonnull
+    @Override
+    public Set<String> getQueueNames(String pubAgentName) {
+        // Queues names are generated only for the subscriber agents which are
+        // alive and are subscribed to the publisher agent name (pubAgentName).
+        // The queue names match the subscriber agent identifier (subAgentId).
+        //
+        // If errors queues are enabled, an error queue name is generated which
+        // follows the pattern "%s-error". The pattern is deliberately different
+        // from the SCD on Jobs one ("error-%s") as we don't want to support
+        // the UI ability to retry items from the error queue.
+        Set<String> queueNames = new HashSet<>();
+        for (String subAgentId : callback.getSubscribedAgentIds(pubAgentName)) {
+            queueNames.add(subAgentId);
+            QueueState subState = callback.getQueueState(pubAgentName, subAgentId);
+            if (subState != null) {
+                boolean errorQueueEnabled = (subState.getMaxRetries() >= 0);
+                if (errorQueueEnabled) {
+                    queueNames.add(String.format("%s-error", subAgentId));
+                }
+            }
+        }
+        return queueNames;
+    }
+
+    @Nullable
+    @Override
+    public DistributionQueue getQueue(String pubAgentName, String queueName) {
+        if (queueName.endsWith("-error")) {
+            return getErrorQueue(pubAgentName, queueName);
+        } else {
+            QueueState state = callback.getQueueState(pubAgentName, queueName);
+            if (state == null) {
+                return null;
+            }
+            long minOffset = state.getLastProcessedOffset() + 1; // Start from offset after last processed
+            OffsetQueue<DistributionQueueItem> agentQueue = getOffsetQueue(pubAgentName, minOffset);
+            return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(), state.getClearCallback());
+        }
+    }
+
+    @Nonnull
+    private DistributionQueue getErrorQueue(String pubAgentName, String queueName) {
+        AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast(queueName, "-error"));
+        String errorQueueKey = getErrorQueueKey(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName());
+        OffsetQueue<Long> errorQueue = errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl<>());
+        long headOffset = errorQueue.getHeadOffset();
+        final OffsetQueue<DistributionQueueItem> agentQueue;
+        if (headOffset < 0) {
+            agentQueue = new OffsetQueueImpl<>();
+        } else {
+            long minReferencedOffset = errorQueue.getItem(headOffset);
+            agentQueue = getOffsetQueue(pubAgentName, minReferencedOffset);
+        }
+
+        return new PubErrQueue(queueName, agentQueue, errorQueue);
+    }
+
+    @Nonnull
+    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
+        try {
+            return cache.getOffsetQueue(pubAgentName, minOffset);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void handleStatus(MessageInfo info, PackageStatusMessage message) {
+        if (message.getStatus() == Status.REMOVED_FAILED) {
+            String errorQueueKey = getErrorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
+            OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
+            errorQueue.putItem(info.getOffset(), message.getOffset());
+        }
+    }
+    
+    private String getErrorQueueKey(String pubAgentName, String subSlingId, String subAgentName) {
+        return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
+    }
+
+    private PubQueueCache newCache() {
+        return new PubQueueCache(eventAdmin, callback);
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
similarity index 95%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
index 47a9f25..a29ef93 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
@@ -24,7 +24,7 @@ import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUE
 import java.util.Calendar;
 import java.util.function.ToIntFunction;
 
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java b/src/main/java/org/apache/sling/distribution/journal/queue/package-info.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/package-info.java
index e4ef634..8086ed2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/package-info.java
@@ -16,10 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
 
-public interface OffsetQueueImplMBean {
-    long getHeadOffset();
-    long getTailOffset();
-    int getSize();
-}
+
+@org.osgi.annotation.versioning.Version("1.0.0")
+@org.osgi.annotation.bundle.Export
+package org.apache.sling.distribution.journal.queue;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentId.java b/src/main/java/org/apache/sling/distribution/journal/shared/AgentId.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentId.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/AgentId.java
index 6887ac8..6f7b45a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentId.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/AgentId.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.shared;
 
 import java.util.Objects;
 import java.util.UUID;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
index 035f8cb..dda463d 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/StateTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/StateTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/StateTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/StateTest.java
index 67d3f44..0b7df80 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/StateTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/StateTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import static java.lang.Math.abs;
 import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiffTest.java
similarity index 99%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiffTest.java
index 1882f33..d46d2e5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiffTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManagerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManagerTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManagerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManagerTest.java
index 2fa730a..2bb3d12 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManagerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManagerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewTest.java
similarity index 99%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewTest.java
index fc1525d..60b4704 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
 
 import static java.lang.Math.abs;
 import static java.util.Arrays.asList;
@@ -37,7 +37,6 @@ import java.util.stream.Collectors;
 
 import org.junit.Test;
 
-
 public class TopologyViewTest {
 
     private static final Random RAND = new Random();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index 0878999..abfb083 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,6 +41,9 @@ import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 9f9de96..de4f552 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -54,8 +55,11 @@ import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.impl.OffsetQueueImpl;
+import org.apache.sling.distribution.journal.queue.impl.PubQueue;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -140,9 +144,6 @@ public class DistributionPublisherTest {
     @Mock
     private PackageQueuedNotifier queuedNotifier;
     
-    @Mock
-    private TopologyView topology;
-    
     @Captor
     private ArgumentCaptor<PackageMessage> pkgCaptor;
 
@@ -215,39 +216,37 @@ public class DistributionPublisherTest {
     
     @Test
     public void testQueueNames() throws DistributionException, IOException {
-        when(discoveryService.getTopologyView()).thenReturn(topology);
-        when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
-        State state = stateWithMaxRetries(-1);
-        when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
+        when(pubQueueProvider.getQueueNames(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
         Iterable<String> names = publisher.getQueueNames();
         assertThat(names, contains(QUEUE_NAME));
     }
 
     @Test
     public void testQueueNamesWithErrorQueue() throws DistributionException, IOException {
-        when(discoveryService.getTopologyView()).thenReturn(topology);
-        when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
-        State state = new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, 1, false);
-        when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
+        when(pubQueueProvider.getQueueNames(Mockito.eq(PUB1AGENT1)))
+            .thenReturn(new HashSet<>(Arrays.asList(QUEUE_NAME, QUEUE_NAME + "-error")));
         Iterable<String> names = publisher.getQueueNames();
         assertThat(names, containsInAnyOrder(QUEUE_NAME + "-error", QUEUE_NAME));
     }
 
     @Test
     public void testGetQueue() throws DistributionException, IOException {
-        when(discoveryService.getTopologyView()).thenReturn(topology);
-        when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
-        State state = stateWithMaxRetries(1);
-        when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
-        publisher.getQueue(QUEUE_NAME);
-        publisher.getQueue(QUEUE_NAME + "-error");
-        // TODO Add assertions
+        when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME))
+            .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null));
+        DistributionQueue queue = publisher.getQueue(QUEUE_NAME);
+        assertThat(queue, notNullValue());
+    }
+    
+    @Test
+    public void testGetErrorQueue() throws DistributionException, IOException {
+        when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME + "-error"))
+            .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null));
+        DistributionQueue queue = publisher.getQueue(QUEUE_NAME + "-error");
+        assertThat(queue, notNullValue());
     }
 
     @Test
     public void testGetWrongQueue() throws DistributionException, IOException {
-        when(discoveryService.getTopologyView()).thenReturn(topology);
-        when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
         Counter counter = new TestCounter();
         when(distributionMetricsService.getQueueAccessErrorCount()).thenReturn(counter);
 
@@ -258,12 +257,7 @@ public class DistributionPublisherTest {
 
     @Test
     public void testGetQueueErrorMetrics() throws DistributionException, IOException {
-        when(discoveryService.getTopologyView()).thenReturn(topology);
-        when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
-        State state = stateWithMaxRetries(1);
-        when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
-        AgentId subAgentId = new AgentId(QUEUE_NAME);
-        when(pubQueueProvider.getQueue(PUB1AGENT1, subAgentId.getSlingId(), subAgentId.getAgentName(), QUEUE_NAME, 2, 0, false))
+       when(pubQueueProvider.getQueue(Mockito.any(), Mockito.any()))
             .thenThrow(new RuntimeException("Error"));
 
         Counter counter = new TestCounter();
@@ -276,10 +270,6 @@ public class DistributionPublisherTest {
         assertEquals("Wrong getQueue error counter",1, counter.getCount());
     }
 
-    private State stateWithMaxRetries(int maxRetries) {
-        return new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, maxRetries, false);
-    }
-
     private PackageMessage mockPackage(DistributionRequest request) throws IOException {
         return PackageMessage.builder()
                 .pkgId("myid")
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
new file mode 100644
index 0000000..befa6e3
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.queue.QueueState;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MessagingCacheCallbackTest {
+    private static final String SUBAGENT_NAME_1 = "subagent1";
+    private static final long CLEAR_OFFSET = 7;
+    private static final long CURRENT_OFFSET = 1l;
+    private static final int HEAD_RETRIES = 2;
+    private static final int MAX_RETRIES = 3;
+
+    private static final String PUB1AGENT1 = "agent1";
+
+    private static final String SLINGID1 = UUID.randomUUID().toString();
+    private static final String SUBAGENT_ID1 = SLINGID1 +"-" + SUBAGENT_NAME_1;
+
+
+    @Mock
+    private MessagingProvider messagingProvider;
+    
+    @Spy
+    private Topics topics;
+    
+    @Mock
+    private JournalAvailable journalAvailable;
+    
+    @Mock
+    private DistributionMetricsService distributionMetricsService;
+    
+    @Mock
+    private MessageHandler<PackageMessage> handler;
+
+    @Mock
+    private MessageSender<Object> sender;
+    
+    @Mock
+    private DiscoveryService discovery;
+    
+    @Mock
+    private Counter counter;
+
+    @InjectMocks
+    private MessagingCacheCallback callback;
+
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+    
+    @Captor
+    private ArgumentCaptor<ClearCommand> clearCommandCaptor;
+
+    @Test
+    public void testCreateConsumer() throws Exception {
+        when(messagingProvider.createSender(Mockito.any())).thenReturn(sender);
+        Closeable poller = callback.createConsumer(handler);
+        assertThat(poller, notNullValue());
+        
+        poller.close();
+    }
+
+    @Test
+    public void testFetchRange() throws Exception {
+        when(distributionMetricsService.getQueueCacheFetchCount()).thenReturn(counter);
+        when(messagingProvider.assignTo(Mockito.eq(10l))).thenReturn("0:10");
+        CompletableFuture<List<FullMessage<PackageMessage>>> result = CompletableFuture.supplyAsync(this::fetch);
+        verify(messagingProvider, timeout(100000)).createPoller(
+                Mockito.anyString(), 
+                Mockito.eq(Reset.earliest), 
+                Mockito.eq("0:10"),
+                handlerCaptor.capture());
+        simulateMessage(19);
+        simulateMessage(20);
+        List<FullMessage<PackageMessage>> messages = result.get(100, TimeUnit.SECONDS);
+        assertThat(messages.size(), equalTo(1));
+    }
+    
+    @Test
+    public void testGetSubscribedAgentIds() {
+        TopologyView topology = createTopologyView();
+        when(discovery.getTopologyView()).thenReturn(topology);
+        Set<String> agentIds = callback.getSubscribedAgentIds(PUB1AGENT1);
+        assertThat(agentIds.size(), equalTo(1));
+        assertThat(agentIds.iterator().next(), equalTo(SUBAGENT_ID1));
+    }
+    
+    @Test
+    public void testGetQueueState() {
+        TopologyView topology = createTopologyView();
+        when(discovery.getTopologyView()).thenReturn(topology);
+        
+        QueueState queueState = callback.getQueueState(PUB1AGENT1, SUBAGENT_ID1);
+        
+        assertThat(queueState.getLastProcessedOffset(), equalTo(CURRENT_OFFSET));
+        assertThat(queueState.getHeadRetries(), equalTo(HEAD_RETRIES));
+        assertThat(queueState.getMaxRetries(), equalTo(MAX_RETRIES));
+        
+        queueState.getClearCallback().clear(CLEAR_OFFSET);
+        
+        verify(sender).accept(clearCommandCaptor.capture());
+        ClearCommand clearCommand = clearCommandCaptor.getValue();
+        assertThat(clearCommand.getOffset(), equalTo(CLEAR_OFFSET));
+        assertThat(clearCommand.getPubAgentName(), equalTo(PUB1AGENT1));
+        assertThat(clearCommand.getSubAgentName(), equalTo(SUBAGENT_NAME_1));
+        assertThat(clearCommand.getSubSlingId(), equalTo(SLINGID1));
+    }
+
+    private TopologyView createTopologyView() {
+        State state = new State(PUB1AGENT1, SUBAGENT_ID1, 0, 
+                CURRENT_OFFSET, HEAD_RETRIES, MAX_RETRIES, true);
+        return new TopologyView(Collections.singleton(state));
+    }
+
+    private void simulateMessage(int offset) {
+        FullMessage<PackageMessage> message = RangePollerTest.createMessage(ReqType.ADD, offset);
+        handlerCaptor.getValue().getHandler().handle(message.getInfo(), message.getMessage());
+    }
+
+    private List<FullMessage<PackageMessage>> fetch() {
+        try {
+            return callback.fetchRange(10l, 20l);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException();
+        }
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index b41029c..60670b5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -19,38 +19,49 @@
 package org.apache.sling.distribution.journal.impl.publisher;
 
 import static java.util.Arrays.asList;
-import static org.mockito.Matchers.any;
+import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
+import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
-import static org.mockito.internal.util.reflection.Whitebox.setInternalState;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 
+import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
+import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.Spy;
 import org.osgi.service.event.EventAdmin;
 
 public class PackageDistributedNotifierTest {
 
     @Mock
-    private PubQueueCacheService pubQueueCacheService;
+    private PubQueueProvider pubQueueCacheService;
 
     @Mock
     private OffsetQueue<DistributionQueueItem> offsetQueue;
 
-    @Mock
+    @Spy
     private Topics topics;
 
     @Mock
@@ -59,29 +70,42 @@ public class PackageDistributedNotifierTest {
     @Mock
     private EventAdmin eventAdmin;
 
+    @InjectMocks
+    private PackageDistributedNotifier notifier;
+
+    @Mock
+    private MessageSender<Object> sender;
+
+    @Captor
+    private ArgumentCaptor<PackageDistributedMessage> messageCaptor;
+
     @Before
     public void before() {
         initMocks(this);
         when(offsetQueue.getItem(anyLong()))
-                .thenReturn(new DistributionQueueItem("packageId", Collections.emptyMap()));
+                .thenReturn(createItem());
         when(pubQueueCacheService.getOffsetQueue(anyString(), anyLong()))
                 .thenReturn(offsetQueue);
+        when(messagingProvider.createSender(Mockito.eq(topics.getEventTopic())))
+            .thenReturn(sender);
     }
 
-
     @Test
     public void testChanged() throws Exception {
-        PackageDistributedNotifier notifier = Mockito.spy(new PackageDistributedNotifier());
-        setInternalState(notifier, "pubQueueCacheService", pubQueueCacheService);
-        setInternalState(notifier, "eventAdmin", eventAdmin);
-        setInternalState(notifier, "messagingProvider", messagingProvider);
-        setInternalState(notifier, "topics", topics);
         notifier.activate();
         TopologyViewDiff diffView = new TopologyViewDiff(
                 buildView(new State("pub1", "sub1", 1000, 10, 0, -1, false)),
                 buildView(new State("pub1", "sub1", 2000, 13, 0, -1, false)));
         notifier.changed(diffView);
-        verify(notifier, times(3)).processOffset(anyString(), any(DistributionQueueItem.class));
+        verify(sender, times(3)).accept(messageCaptor.capture());
+    }
+
+    private DistributionQueueItem createItem() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(QueueItemFactory.RECORD_OFFSET, 10l);
+        properties.put(PROPERTY_REQUEST_PATHS, new String[] {"/test"});
+        properties.put(PROPERTY_REQUEST_DEEP_PATHS, new String[] {"/test"});
+        return new DistributionQueueItem("packageId", properties);
     }
 
     private TopologyView buildView(State ... state) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisherTest.java
new file mode 100644
index 0000000..00df5e6
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisherTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubQueueProviderPublisherTest {
+    
+    @Mock
+    MessagingProvider messagingProvider;
+
+    @Mock
+    private BundleContext context;
+    
+    @Mock
+    PubQueueProviderFactory pubQueueProviderFactory;
+    
+    @Spy
+    Topics topics = new Topics();
+
+    @InjectMocks
+    private PubQueueProviderPublisher pubQueueProviderPublisher;
+
+    @Mock
+    private PubQueueProvider pubQueueProvider;
+
+    @Mock
+    private ServiceRegistration<PubQueueProvider> reg;
+
+    @Test
+    public void testCycle() throws IOException {
+        when(pubQueueProviderFactory.create(Mockito.any())).thenReturn(pubQueueProvider);
+        when(context.registerService(Mockito.eq(PubQueueProvider.class), Mockito.eq(pubQueueProvider), Mockito.any()))
+                .thenReturn(reg);
+        pubQueueProviderPublisher.activate(context);
+        
+        pubQueueProviderPublisher.deactivate();
+        verify(pubQueueProvider).close();
+        verify(reg).unregister();
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
index 29a5133..8a3cf76 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.timeout;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
index 6d82121..217bd59 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.contains;
@@ -107,7 +107,7 @@ public class RangePollerTest {
         }
     }
 
-    private FullMessage<PackageMessage> createMessage(ReqType reqType, int offset) {
+    public static FullMessage<PackageMessage> createMessage(ReqType reqType, long offset) {
         MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis());
         PackageMessage message = PackageMessage.builder()
                 .pubAgentName("agent1")
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
deleted file mode 100644
index 9fe877a..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.shared.PackageRetries;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class SubQueueTest {
-
-    @Test
-    public void testGetName() throws Exception {
-        String queueName = "someQueue";
-        SubQueue queue = new SubQueue(queueName, null, new PackageRetries());
-        Assert.assertEquals(queueName, queue.getName());
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testAdd() throws Exception {
-        SubQueue queue = new SubQueue("someQueue", null, new PackageRetries());
-        queue.add(buildQueueItem("package-1"));
-    }
-
-    @Test
-    public void testGetHead() throws Exception {
-        SubQueue emptyQueue = new SubQueue("emptyQueue", null, new PackageRetries());
-        Assert.assertNull(emptyQueue.getHead());
-        SubQueue oneQueue = new SubQueue("oneQueue", buildQueueItem("1"), new PackageRetries());
-        Assert.assertNotNull(oneQueue.getHead());
-    }
-
-    @Test
-    public void testGetItems() throws Exception {
-        SubQueue oneQueue = new SubQueue("oneQueue", null, new PackageRetries());
-        Assert.assertNotNull(oneQueue.getEntries(0, 10));
-        SubQueue tenQueue = new SubQueue("tenQueue", buildQueueItem("1"), new PackageRetries());
-        Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, 10)).size());
-        Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, -1)).size());
-        Assert.assertEquals(0, Lists.newArrayList(tenQueue.getEntries(1, 10)).size());
-    }
-
-    private DistributionQueueItem buildQueueItem(String packageId) {
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(QueueItemFactory.RECORD_TOPIC, "topic");
-        properties.put(QueueItemFactory.RECORD_OFFSET, 0);
-        properties.put(QueueItemFactory.RECORD_PARTITION, 0);
-        properties.put(QueueItemFactory.RECORD_TIMESTAMP, System.currentTimeMillis());
-        return new DistributionQueueItem(packageId, properties);
-    }
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index fd4e799..219a561 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -96,6 +96,7 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.EventAdmin;
@@ -228,10 +229,10 @@ public class SubscriberTest {
         initSubscriber(ImmutableMap.of("agentNames", "dummy"));
         assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
         
-        MessageInfo info = new TestMessageInfo("", 1, 100, 0);
+        MessageInfo info = createInfo(100);
         PackageMessage message = BASIC_ADD_PACKAGE;
-        
         packageHandler.handle(info, message);
+        
         verify(packageBuilder, timeout(1000).times(0)).installPackage(Mockito.any(ResourceResolver.class), 
                 Mockito.any(ByteArrayInputStream.class));
         assertThat(getStoredOffset(), nullValue());
@@ -246,14 +247,13 @@ public class SubscriberTest {
         assumeNoPrecondition();
         initSubscriber();
         assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
-        
-        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-        PackageMessage message = BASIC_ADD_PACKAGE;
+
         final Semaphore sem = new Semaphore(0);
-        when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), 
-                Mockito.any(ByteArrayInputStream.class))
-                ).thenAnswer(new WaitFor(sem));
+        whenInstallPackage()
+            .thenAnswer(new WaitFor(sem));
         
+        MessageInfo info = createInfo(0l);
+        PackageMessage message = BASIC_ADD_PACKAGE;
         packageHandler.handle(info, message);
         
         waitSubscriber(RUNNING);
@@ -267,15 +267,13 @@ public class SubscriberTest {
     public void testReceiveDelete() throws DistributionException, LoginException, PersistenceException {
         assumeNoPrecondition();
         initSubscriber();
+        final Semaphore sem = new Semaphore(0);
+        whenInstallPackage()
+            .thenAnswer(new WaitFor(sem));
 
         createResource("/test");
-        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+        MessageInfo info = createInfo(0l);
         PackageMessage message = BASIC_DEL_PACKAGE;
-        final Semaphore sem = new Semaphore(0);
-        when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
-                Mockito.any(ByteArrayInputStream.class))
-        ).thenAnswer(new WaitFor(sem));
-        
         packageHandler.handle(info, message);
         
         waitSubscriber(RUNNING);
@@ -289,13 +287,11 @@ public class SubscriberTest {
     public void testSendFailedStatus() throws DistributionException {
         assumeNoPrecondition();
         initSubscriber(ImmutableMap.of("maxRetries", "1"));
+        whenInstallPackage()
+        .thenThrow(new RuntimeException("Expected"));
 
-        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+        MessageInfo info = createInfo(0l);
         PackageMessage message = BASIC_ADD_PACKAGE;
-        when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
-                Mockito.any(ByteArrayInputStream.class))
-        ).thenThrow(new RuntimeException("Expected"));
-
         packageHandler.handle(info, message);
         
         verify(statusSender, timeout(10000).times(1)).accept(anyObject());
@@ -307,9 +303,8 @@ public class SubscriberTest {
         // Only editable subscriber will send status
         initSubscriber(ImmutableMap.of("editable", "true"));
 
-        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+        MessageInfo info = createInfo(0l);
         PackageMessage message = BASIC_ADD_PACKAGE;
-
         packageHandler.handle(info, message);
         
         waitSubscriber(IDLE);
@@ -320,9 +315,9 @@ public class SubscriberTest {
     public void testSkipBecauseOfPrecondition() throws DistributionException, InterruptedException, TimeoutException {
         when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.SKIP);
         initSubscriber(ImmutableMap.of("editable", "true"));
-        MessageInfo info = new TestMessageInfo("", 1, 11, 0);
-        PackageMessage message = BASIC_ADD_PACKAGE;
 
+        MessageInfo info = createInfo(11l);
+        PackageMessage message = BASIC_ADD_PACKAGE;
         packageHandler.handle(info, message);
         
         await().until(this::getStatus, equalTo(PackageStatusMessage.Status.REMOVED));
@@ -333,12 +328,13 @@ public class SubscriberTest {
     public void testPreconditionTimeoutExceptionBecauseOfShutdown() throws DistributionException, InterruptedException, TimeoutException, IOException {
         when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.WAIT);
         initSubscriber(ImmutableMap.of("editable", "true"));
-        MessageInfo info = new TestMessageInfo("", 1, 11, 0);
-        PackageMessage message = BASIC_ADD_PACKAGE;
-
         long startedAt = System.currentTimeMillis();
+
+        MessageInfo info = createInfo(11l);
+        PackageMessage message = BASIC_ADD_PACKAGE;
         packageHandler.handle(info, message);
         subscriber.deactivate();
+        
         assertThat("After deactivate precondition should time out quickly.", System.currentTimeMillis() - startedAt, lessThan(1000l));
     }
 
@@ -347,15 +343,24 @@ public class SubscriberTest {
         Semaphore sem = new Semaphore(0);
         assumeWaitingForPrecondition(sem);
         initSubscriber();
-        MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-        PackageMessage message = BASIC_ADD_PACKAGE;
 
+        MessageInfo info = createInfo(0l);
+        PackageMessage message = BASIC_ADD_PACKAGE;
         packageHandler.handle(info, message);
+
         waitSubscriber(RUNNING);
         await("Should report ready").until(() -> subscriberReadyStore.getReadyHolder(SUB1_AGENT_NAME).get());
         sem.release();
     }
     
+    private OngoingStubbing<DistributionPackageInfo> whenInstallPackage() throws DistributionException {
+        return when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), Mockito.any(ByteArrayInputStream.class)));
+    }
+
+    private TestMessageInfo createInfo(long offset) {
+        return new TestMessageInfo("", 1, offset, 0);
+    }
+
     private Long getStoredOffset() {
         LocalStore store = new LocalStore(resolverFactory, BookKeeper.STORE_TYPE_PACKAGE, SUB1_AGENT_NAME);
         return store.load(BookKeeper.KEY_OFFSET, Long.class);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/QueueItemFactoryTest.java
similarity index 87%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/QueueItemFactoryTest.java
index d494392..b12b494 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/QueueItemFactoryTest.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
 
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_PARTITION;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TOPIC;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_PACKAGE_TYPE;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/EntryUtilTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/EntryUtilTest.java
index 6bc7ede..b884a79 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/EntryUtilTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import static org.junit.Assert.assertEquals;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplJMXTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplJMXTest.java
index 353da42..6c07b0d 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplJMXTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -33,7 +33,7 @@ import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.junit.Test;
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplTest.java
index 4656e51..ebfedf5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertThat;
 import java.util.Arrays;
 import java.util.Iterator;
 
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
similarity index 64%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
index aca7d01..266d54d 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
@@ -16,43 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.stream.LongStream;
 
 import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,35 +75,20 @@ public class PubQueueCacheTest {
     private static final Random RAND = new Random();
 
     @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+    private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
 
     @Mock
     private EventAdmin eventAdmin;
 
     @Mock
-    private MessagingProvider clientProvider;
-
-    @Mock
-    private QueueCacheSeeder cacheSeeder;
-
-    @Mock
-    private DistributionMetricsService distributionMetricsService;
+    private CacheCallback callback;
 
     @Mock
     private Counter counter;
 
     @Mock
-    private LocalStore seedStore;
-
-    @Mock
     private Closeable poller;
 
-    @Mock
-    private MessageSender<Object> sender;
-
-    @Mock
-    private QueueCacheSeeder seeder;
-
     private PubQueueCache cache;
 
     private ExecutorService executor;
@@ -120,29 +98,12 @@ public class PubQueueCacheTest {
 
     @Before
     public void before() {
-        when(clientProvider.createPoller(
-                eq(TOPIC),
-                eq(Reset.latest),
-                handlerCaptor.capture()))
+        when(callback.createConsumer(handlerCaptor.capture()))
                 .thenReturn(poller);
-        when(clientProvider.createPoller(
-                eq(TOPIC),
-                eq(Reset.earliest),
-                Mockito.anyString(),
-                handlerCaptor.capture()))
-                .thenReturn(poller);
-        when(clientProvider.createSender(Mockito.anyString()))
-            .thenReturn(sender);
-
-        when(distributionMetricsService.getQueueCacheFetchCount())
-            .thenReturn(counter);
 
-        when(seedStore.load(anyString(), any())).thenReturn(0L);
-
-        cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seeder);
-        verify(seeder).startSeeding();
+        cache = new PubQueueCache(eventAdmin, callback);
         executor = Executors.newFixedThreadPool(10);
-        tailHandler = handlerCaptor.getValue().getHandler();
+        tailHandler = handlerCaptor.getValue();
     }
 
     @After
@@ -165,21 +126,13 @@ public class PubQueueCacheTest {
 
     @Test
     public void testFetchWithSingleConsumer() throws Exception {
-        simulateMessage(tailHandler, 200);
+        simulateMessage(tailHandler, 200l);
+        when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l)))
+                .thenReturn(Arrays.asList(createTestMessage(100, PUB_AGENT_NAME_1, ReqType.ADD)));
         Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
-        // seeding the cache with a message at offset 200
-        // wait that the consumer has started fetching the offsets from 100 to 200
-        MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
-        // simulate messages for the fetched offsets
-        simulateMessages(headHandler, 100, cache.getMinOffset());
         // the consumer returns the offset queue
         consumer.get(15, SECONDS);
-        assertEquals(100, cache.getMinOffset());
-    }
-
-    private MessageHandler<PackageMessage> awaitHeadHandler() {
-        return Awaitility.await().ignoreExceptions()
-                .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
+        assertEquals(100l, cache.getMinOffset());
     }
 
 	@Test
@@ -188,18 +141,15 @@ public class PubQueueCacheTest {
         // build two consumers for same agent queue, from offset 100
         Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100);
         Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100);
-        // seeding the cache with a message at offset 200
-        // wait that one consumer has started fetching the offsets from 100 to 200
-        MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
-        // simulate messages for the fetched offsets
-        simulateMessages(headHandler, 100, cache.getMinOffset());
-        // both consumers returns the offset queue
+        when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l)))
+        .thenReturn(Arrays.asList(createTestMessage(100, PUB_AGENT_NAME_1, ReqType.ADD)));
         OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
         OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
         assertEquals(q1.getSize(), q2.getSize());
         assertEquals(100, cache.getMinOffset());
-        // the offsets have been fetched only once
-        assertEquals(2, handlerCaptor.getAllValues().size());
+        
+        // Fetch should only happen once
+        verify(callback, times(1)).fetchRange(Mockito.anyLong(), Mockito.anyLong());
     }
 
     @Test
@@ -213,11 +163,6 @@ public class PubQueueCacheTest {
         assertEquals(4, cache.size());
     }
 
-    private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
-        LongStream.rangeClosed(fromOffset, toOffset)
-            .forEach(offset -> simulateMessage(handler, offset));
-    }
-    
     private void simulateMessage(MessageHandler<PackageMessage> handler, long offset) {
         simulateMessage(handler,
                 pickAny(PUB_AGENT_NAME_1, PUB_AGENT_NAME_2, PUB_AGENT_NAME_3),
@@ -225,19 +170,31 @@ public class PubQueueCacheTest {
     }
 
     private void simulateMessage(MessageHandler<PackageMessage> handler, String pubAgentName, ReqType reqType, long offset) {
-        PackageMessage msg = PackageMessage.builder()
+        PackageMessage msg = createTestMessage(pubAgentName, reqType);
+        simulateMessage(handler, msg, offset);
+    }
+
+    private void simulateMessage(MessageHandler<PackageMessage> handler, PackageMessage msg, long offset) {
+        log.info("Simulate msg @ offset {}", offset);
+        handler.handle(createInfo(offset), msg);
+    }
+    
+    private FullMessage<PackageMessage> createTestMessage(long offset, String pubAgentName, ReqType reqType) {
+        return new FullMessage<>(createInfo(offset), createTestMessage(pubAgentName, reqType));
+    }
+
+    private PackageMessage createTestMessage(String pubAgentName, ReqType reqType) {
+        return PackageMessage.builder()
                 .pkgType("pkgType")
                 .pkgId(UUID.randomUUID().toString())
                 .pubSlingId("pubSlingId")
                 .reqType(reqType)
                 .pubAgentName(pubAgentName)
                 .build();
-        simulateMessage(handler, msg, offset);
     }
 
-    private void simulateMessage(MessageHandler<PackageMessage> handler, PackageMessage msg, long offset) {
-        log.info("Simulate msg @ offset {}", offset);
-        handler.handle(new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis()), msg);
+    private MessageInfo createInfo(long offset) {
+        return new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis());
     }
 
     Future<OffsetQueue<DistributionQueueItem>> consumer(String pubAgentName, long minOffset) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
similarity index 74%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
index da13e6b..f462f56 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -29,6 +30,7 @@ import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.UUID;
 
 import javax.management.AttributeNotFoundException;
 import javax.management.InstanceNotFoundException;
@@ -49,6 +51,8 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.QueueState;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -61,13 +65,14 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
 import org.osgi.service.event.EventAdmin;
 
 public class PubQueueProviderTest {
     private static final String PUB1_AGENT_NAME = "pub1";
     private static final String PUB2_AGENT_NAME = "pub2";
 
-    private static final String SUB_SLING_ID = "sub1sling";
+    private static final String SUB_SLING_ID = UUID.randomUUID().toString();
     private static final String SUB_AGENT_NAME = "sub1";
     private static final String SUB_AGENT_ID = SUB_SLING_ID +"-" + SUB_AGENT_NAME;
 
@@ -76,7 +81,7 @@ public class PubQueueProviderTest {
     private MessagingProvider clientProvider;
     
     @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+    private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
 
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statHandlerCaptor;
@@ -93,10 +98,13 @@ public class PubQueueProviderTest {
     @Mock
     private MessageSender<Object> sender;
 
-    private PubQueueCacheService pubQueueCacheService;
+    @Mock
+    private CacheCallback callback;
+
+    @Mock
+    private BundleContext context;
 
     private MessageHandler<PackageMessage> handler;
-    private MessageHandler<PackageStatusMessage> statHandler;
 
     private PubQueueProviderImpl queueProvider;
     private MBeanServer mbeanServer;
@@ -104,49 +112,43 @@ public class PubQueueProviderTest {
     @Before
     public void before() throws PersistenceException {
         MockitoAnnotations.initMocks(this);
-        when(clientProvider.createPoller(
-                Mockito.eq(Topics.PACKAGE_TOPIC),
-                Mockito.any(Reset.class),
-                handlerCaptor.capture()))
-        .thenReturn(poller);
+        when(callback.createConsumer(handlerCaptor.capture()))
+                .thenReturn(poller);
         when(clientProvider.createPoller(
                 Mockito.eq(Topics.STATUS_TOPIC), 
                 Mockito.any(Reset.class),
                 statHandlerCaptor.capture()))
         .thenReturn(statPoller);
-        when(clientProvider.createSender(Mockito.anyString()))
-        .thenReturn(sender);
-        Topics topics = new Topics();
-        pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
-        pubQueueCacheService.activate();
-        queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
-        queueProvider.activate();
-        handler = handlerCaptor.getValue().getHandler();
-        statHandler = statHandlerCaptor.getValue().getHandler();
+        queueProvider = new PubQueueProviderImpl(eventAdmin, callback, context);
+        handler = handlerCaptor.getValue();
     }
 
     @After
     public void after() throws IOException {
-        pubQueueCacheService.deactivate();
-        queueProvider.deactivate();
-        verify(poller).close();
-        verify(statPoller).close();
+        queueProvider.close();
+        verify(poller,  atLeast(1)).close();
     }
     
     @Test
     public void test() throws Exception {
-        handler.handle(info(0L), packageMessage("packageid1", PUB1_AGENT_NAME));
-        handler.handle(info(1L), packageMessage("packageid2", PUB2_AGENT_NAME));
-        handler.handle(info(2L), packageMessage("packageid3", PUB1_AGENT_NAME));
+        handler.handle(info(1L), packageMessage("packageid1", PUB1_AGENT_NAME));
+        handler.handle(info(2L), packageMessage("packageid2", PUB2_AGENT_NAME));
+        handler.handle(info(3L), packageMessage("packageid3", PUB1_AGENT_NAME));
+        
+        when(callback.getQueueState(Mockito.eq(PUB1_AGENT_NAME), Mockito.any()))
+            .thenReturn(new QueueState(0, -1, 0, null));
         
         // Full pub1 queue contains all packages from pub1
-        DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 0, -1, false);
+        DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID);
         Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator();
         assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1"));
         assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3"));
         
         // With offset 1 first package is removed
-        DistributionQueue queue2 = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 1, -1, false);
+        when(callback.getQueueState(Mockito.eq(PUB1_AGENT_NAME), Mockito.any()))
+            .thenReturn(new QueueState(1, -1, 0, null));
+        
+        DistributionQueue queue2 = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID);
         Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator();
         assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3"));
         assertThat(it2.hasNext(), equalTo(false));
@@ -155,13 +157,13 @@ public class PubQueueProviderTest {
         Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(new ObjectName("org.apache.sling.distribution:type=OffsetQueue,id="+PUB1_AGENT_NAME), null);
         ObjectInstance mbean = mbeans.iterator().next();
         assertThat(getAttrib(mbean, "Size"), equalTo(2));
-        assertThat(getAttrib(mbean, "HeadOffset"), equalTo(0L));
-        assertThat(getAttrib(mbean, "TailOffset"), equalTo(2L));
+        assertThat(getAttrib(mbean, "HeadOffset"), equalTo(1L));
+        assertThat(getAttrib(mbean, "TailOffset"), equalTo(3L));
     }
     
     @Test
     public void testEmptyErrorQueue() throws Exception {
-        DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+        DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID + "-error");
         assertThat(queue.getStatus().getItemsCount(), equalTo(0));
     }
     
@@ -174,14 +176,36 @@ public class PubQueueProviderTest {
         MessageInfo info = info(1L);
         handler.handle(info, pkgMsg1);
         PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1);
-        statHandler.handle(info, statusMsg1);
+        queueProvider.handleStatus(info, statusMsg1);
         
-        DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+        DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID + "-error");
         assertThat(queue.getStatus().getItemsCount(), equalTo(1));
         DistributionQueueEntry head = queue.getHead();
         DistributionQueueItem item = head.getItem();
         assertThat(item.getPackageId(), equalTo("packageid1")); 
     }
+    
+    @Test
+    public void testCleanUp() {
+        handler.handle(info(0L), packageMessage("packageid1", PUB1_AGENT_NAME));
+
+        assertThat(queueSize(), equalTo(1));
+        queueProvider.run();
+        assertThat(queueSize(), equalTo(1));
+        
+        for (long c=0; c<10001;c++) {
+            handler.handle(info(c), packageMessage("packageid" + c, PUB1_AGENT_NAME));
+        }
+        assertThat(queueSize(), equalTo(10001));
+        queueProvider.run();
+        handler = handlerCaptor.getValue();
+        handler.handle(info(0L), packageMessage("packageid1", PUB1_AGENT_NAME));
+        assertThat(queueSize(), equalTo(1));
+    }
+
+    private int queueSize() {
+        return queueProvider.getOffsetQueue(PUB1_AGENT_NAME, 0).getSize();
+    }
 
     private MessageInfo info(long offset) {
         MessageInfo info = Mockito.mock(MessageInfo.class);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
index 28b5320..8ae13b4 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
 
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_PARTITION;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/AgentIdTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/AgentIdTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/AgentIdTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/AgentIdTest.java
index f7a4fb8..6822ba0 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/AgentIdTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/AgentIdTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.shared;
 
 import static org.junit.Assert.assertEquals;