You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/23 13:47:41 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-8932 - Redesign DistributionSubscriber to offload responsibilities (#17)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new e88d623  SLING-8932 - Redesign DistributionSubscriber to offload responsibilities (#17)
e88d623 is described below

commit e88d623e506f3e4a7e29cd982f583571948add48
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Mon Dec 23 14:47:35 2019 +0100

    SLING-8932 - Redesign DistributionSubscriber to offload responsibilities (#17)
    
    * SLING-8932 - Redesign DistributionSubscriber to offload responsibilities
    
    * SLING-8932 - Fixing issues from review
    
    * SLING-8932 - Use old logic for sendStoredStatus
---
 .../journal/impl/subscriber/Announcer.java         |  38 +-
 .../journal/impl/subscriber/BookKeeper.java        | 335 +++++++++++++
 .../journal/impl/subscriber/CommandPoller.java     | 104 ++++
 .../impl/subscriber/DistributionSubscriber.java    | 544 ++++-----------------
 .../journal/impl/subscriber/PackageHandler.java    |  89 ++++
 .../journal/impl/subscriber/AnnouncerTest.java     |  16 +-
 6 files changed, 645 insertions(+), 481 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index a121693..ffc27bb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -24,29 +24,25 @@ import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
 import org.apache.sling.distribution.journal.messages.Messages;
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.MessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @ParametersAreNonnullByDefault
 class Announcer implements Runnable, Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(Announcer.class);
 
-    private final String topicName;
+    private final BookKeeper bookKeeper;
 
-    private final LocalStore offsetStore;
-
-    private final MessageSender<DiscoveryMessage> sender;
+    private final Consumer<DiscoveryMessage> sender;
 
     private final String subSlingId;
 
@@ -54,8 +50,6 @@ class Announcer implements Runnable, Closeable {
 
     private final Set<String> pubAgentNames;
 
-    private final PackageRetries packageRetries;
-
     private final boolean editable;
 
     private final int maxRetries;
@@ -64,21 +58,17 @@ class Announcer implements Runnable, Closeable {
 
     public Announcer(String subSlingId,
                      String subAgentName,
-                     String topicName,
                      Set<String> pubAgentNames,
-                     MessageSender<DiscoveryMessage> disSender,
-                     LocalStore offsetStore,
-                     PackageRetries packageRetries,
+                     Consumer<DiscoveryMessage> disSender,
+                     BookKeeper bookKeeper,
                      int maxRetries,
                      boolean editable,
                      int announceDelay) {
         this.subSlingId = Objects.requireNonNull(subSlingId);
         this.subAgentName = Objects.requireNonNull(subAgentName);
-        this.topicName = Objects.requireNonNull(topicName);
         this.pubAgentNames = Objects.requireNonNull(pubAgentNames);
         this.sender = Objects.requireNonNull(disSender);
-        this.offsetStore = Objects.requireNonNull(offsetStore);
-        this.packageRetries = Objects.requireNonNull(packageRetries);
+        this.bookKeeper = Objects.requireNonNull(bookKeeper);
         this.maxRetries = maxRetries;
         this.editable = editable;
         executor = Executors.newSingleThreadScheduledExecutor();
@@ -90,7 +80,7 @@ class Announcer implements Runnable, Closeable {
         LOG.debug("Sending discovery message for agent {}", subAgentName);
         try {
 
-            long offset = offsetStore.load("offset", -1L);
+            long offset = bookKeeper.loadOffset();
 
             SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
                     .setEditable(editable)
@@ -102,18 +92,18 @@ class Announcer implements Runnable, Closeable {
                     .setSubAgentName(subAgentName)
                     .setSubscriberConfiguration(subscriberConfiguration);
             for (String pubAgentName : pubAgentNames) {
-                int retries = packageRetries.get(pubAgentName);
-                disMsgBuilder.addSubscriberState(createOffset(pubAgentName, offset, retries));
+                disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
             }
 
-            sender.send(topicName, disMsgBuilder.build());
+            sender.accept(disMsgBuilder.build());
         } catch (Throwable e) {
             String msg = String.format("Failed to send discovery message for agent %s, %s", subAgentName, e.getMessage());
             LOG.info(msg, e);
         }
     }
 
-    private SubscriberState createOffset(String pubAgentName, long offset, int retries) {
+    private SubscriberState subscriberState(String pubAgentName, long offset) {
+        int retries = bookKeeper.getRetries(pubAgentName);
         return Messages.SubscriberState.newBuilder()
                 .setPubAgentName(pubAgentName)
                 .setRetries(retries)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
new file mode 100644
index 0000000..e8136c2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Keeps track of offset and processed status and manages 
+ * coordinates the import/retry handling.
+ * 
+ * The offset store is identified by the agentName only.
+ *
+ * With non clustered publish instances deployment, each
+ * instance stores the offset in its own node store, thus
+ * avoiding mix ups. Moreover, when cloning an instance
+ * from a node store, the cloned instance will implicitly
+ * recover the offsets and start from the last processed
+ * offset.
+ *
+ * With clustered publish instances deployment, only one
+ * Subscriber agent must run on the cluster in order to
+ * avoid mix ups.
+ *
+ * The clustered and non clustered publish instances use
+ * cases can be supported by only running the Subscriber
+ * agent on the leader instance.
+ */
+public class BookKeeper implements Closeable {
+    private static final String SUBSERVICE_IMPORTER = "importer";
+    private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
+    private static final int RETRY_SEND_DELAY = 1000;
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private final ResourceResolverFactory resolverFactory;
+    private final DistributionMetricsService distributionMetricsService;
+    private final PackageHandler packageHandler;
+    private final EventAdmin eventAdmin;
+    private final Consumer<PackageStatusMessage> sender;
+    private final boolean editable;
+    private final int maxRetries;
+    private final boolean errorQueueEnabled;
+
+    private final PackageRetries packageRetries = new PackageRetries();
+    private final LocalStore statusStore;
+    private final LocalStore processedOffsets;
+    private final String subAgentName;
+    private final String subSlingId;
+    private GaugeService<Integer> retriesGauge;
+
+    public BookKeeper(ResourceResolverFactory resolverFactory, 
+            DistributionMetricsService distributionMetricsService,
+            PackageHandler packageHandler,
+            EventAdmin eventAdmin,
+            Consumer<PackageStatusMessage> sender,
+            String subAgentName,
+            String subSlingId,
+            boolean editable, 
+            int maxRetries) { 
+        this.packageHandler = packageHandler;
+        this.eventAdmin = eventAdmin;
+        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
+        this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+        this.resolverFactory = resolverFactory;
+        this.distributionMetricsService = distributionMetricsService;
+        this.sender = sender;
+        this.subAgentName = subAgentName;
+        this.subSlingId = subSlingId;
+        this.editable = editable;
+        this.maxRetries = maxRetries;
+        // Error queues are enabled when the number
+        // of retry attempts is limited ; disabled otherwise
+        this.errorQueueEnabled = (maxRetries >= 0);
+        this.statusStore = new LocalStore(resolverFactory, "statuses", subAgentName);
+        this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
+    }
+    
+    /**
+     * We aim at processing the packages exactly once. Processing the packages
+     * exactly once is possible with the following conditions
+     *
+     * I. The package importer is configured to disable auto-committing changes.
+     *
+     * II. A single commit aggregates three content updates
+     *
+     * C1. install the package 
+     * C2. store the processing status 
+     * C3. store the offset processed
+     *
+     * Some package importers require auto-saving or issue partial commits before
+     * failing. For those packages importers, we aim at processing packages at least
+     * once, thanks to the order in which the content updates are applied.
+     */
+    public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
+        log.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(),
+                pkgMsg.getReqType(), offset));
+        addPackageMDC(pkgMsg);
+        try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
+                ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
+            packageHandler.apply(importerResolver, pkgMsg);
+            if (editable) {
+                storeStatus(importerResolver, new PackageStatus(IMPORTED, offset, pkgMsg.getPubAgentName()));
+            }
+            storeOffset(importerResolver, offset);
+            importerResolver.commit();
+            distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
+            distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+            packageRetries.clear(pkgMsg.getPubAgentName());
+            Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
+            eventAdmin.postEvent(event);
+        } catch (LoginException | IOException | RuntimeException e) {
+            failure(pkgMsg, offset, e);
+        } finally {
+            MDC.clear();
+        }
+    }
+    
+    private void addPackageMDC(PackageMessage pkgMsg) {
+        MDC.put("module", "distribution");
+        MDC.put("package-id", pkgMsg.getPkgId());
+        String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
+        MDC.put("paths", paths);
+        MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
+        String pubAgentName = pkgMsg.getPubAgentName();
+        MDC.put("pub-agent-name", pubAgentName);
+        MDC.put("distribution-message-type", pkgMsg.getReqType().name());
+        MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
+        MDC.put("sub-sling-id", subSlingId);
+        MDC.put("sub-agent-name", subAgentName);
+    }
+    
+    /**
+     * Should be called on a exception while importing a package.
+     * 
+     * When we use an error queue and the max retries is reached the package is removed.
+     * In all other cases a DistributionException is thrown that signals that we should retry the
+     * package.
+     * 
+     * @param pkgMsg
+     * @param offset
+     * @param e
+     * @throws DistributionException if the package should be retried
+     */
+    private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
+        distributionMetricsService.getFailedPackageImports().mark();
+
+        String pubAgentName = pkgMsg.getPubAgentName();
+        int retries = packageRetries.get(pubAgentName);
+        if (errorQueueEnabled && retries >= maxRetries) {
+            log.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
+            removeFailedPackage(pkgMsg, offset);
+        } else {
+            packageRetries.increase(pubAgentName);
+            String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
+            throw new DistributionException(msg, e);
+        }
+    }
+
+    public void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
+        log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            if (editable) {
+                storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
+            }
+            storeOffset(resolver, offset);
+            resolver.commit();
+        }
+        packageRetries.clear(pkgMsg.getPubAgentName());
+        context.stop();
+    }
+
+    public void sendStoredStatus() throws InterruptedException, IOException {
+        try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+            PackageStatus status = new PackageStatus(statusStore.load());
+            boolean sent = status.sent;
+            for (int retry = 0 ; !sent ; retry++) {
+                try {
+                    sendStatusMessage(status, retry);
+                    markStatusSent();
+                    sent = true;
+                } catch (Exception e) {
+                    log.warn("Cannot send status (retry {})", retry, e);
+                    Thread.sleep(RETRY_SEND_DELAY);
+                }
+            }
+        }
+    }
+    
+    private void sendStatusMessage(PackageStatus status, int retry) throws InterruptedException {
+        PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
+                .setSubSlingId(subSlingId)
+                .setSubAgentName(subAgentName)
+                .setPubAgentName(status.pubAgentName)
+                .setOffset(status.offset)
+                .setStatus(status.status)
+                .build();
+        sender.accept(pkgStatMsg);
+        log.info("Sent status message {}",  pkgStatMsg);
+    }
+
+    public void markStatusSent() {
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            statusStore.store(resolver, "sent", true);
+            resolver.commit();
+        } catch (Exception e) {
+            log.warn("Failed to mark status as sent", e);
+        }
+    }
+    
+    public long loadOffset() {
+        return  processedOffsets.load("offset", -1L);
+    }
+
+    public int getRetries(String pubAgentName) {
+        return packageRetries.get(pubAgentName);
+    }
+
+    public PackageRetries getPackageRetries() {
+        return packageRetries;
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOUtils.closeQuietly(retriesGauge);
+    }
+    
+    private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
+        log.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
+            storeOffset(resolver, offset);
+            resolver.commit();
+        } catch (Exception e) {
+            throw new DistributionException("Error removing failed package", e);
+        }
+        context.stop();
+    }
+
+    private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) throws PersistenceException {
+        Map<String, Object> statusMap = packageStatus.asMap();
+        statusStore.store(resolver, statusMap);
+        log.info("Stored status {}", statusMap);
+    }
+
+    private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
+        processedOffsets.store(resolver, "offset", offset);
+    }
+
+    private ResourceResolver getServiceResolver(String subService) throws LoginException {
+        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, SUBSERVICE_BOOKKEEPER));
+    }
+
+    class PackageStatus {
+        final Status status;
+        final Long offset;
+        final String pubAgentName;
+        final Boolean sent;
+
+        PackageStatus(Status status, long offset, String pubAgentName) {
+            this.status = status;
+            this.offset = offset;
+            this.pubAgentName = pubAgentName;
+            this.sent = false;
+        }
+        
+        PackageStatus(ValueMap statusMap) {
+            Integer statusNum = statusMap.get("statusNumber", Integer.class);
+            this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
+            this.offset = statusMap.get("offset", Long.class);
+            this.pubAgentName = statusMap.get("pubAgentName", String.class);
+            this.sent = statusMap.get("sent", true);
+        }
+
+        Map<String, Object> asMap() {
+            Map<String, Object> s = new HashMap<>();
+            s.put("pubAgentName", pubAgentName);
+            s.put("statusNumber", status.getNumber());
+            s.put("offset", offset);
+            s.put("sent", sent);
+            return s;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
new file mode 100644
index 0000000..10186a7
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommandPoller implements Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
+
+    private final String subSlingId;
+    private final String subAgentName;
+    private final boolean editable;
+    private final Closeable commandPoller;
+    private final AtomicLong clearOffset = new AtomicLong(-1);
+
+    public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, boolean editable) {
+        this.subSlingId = subSlingId;
+        this.subAgentName = subAgentName;
+        this.editable = editable;
+        if (editable) {
+
+            /*
+             * We currently only support commands requiring editable mode.
+             * As an optimisation, we don't register a poller for non
+             * editable subscribers.
+             *
+             * When supporting commands independent from editable mode,
+             * this optimisation will be removed.
+             */
+
+            commandPoller = messagingProvider.createPoller(
+                    topics.getCommandTopic(),
+                    Reset.earliest,
+                    create(CommandMessage.class, this::handleCommandMessage));
+        } else {
+            commandPoller = null;
+        }
+    }
+    
+    public boolean isCleared(long offset) {
+        return offset <= clearOffset.longValue();
+    }
+
+    private void handleCommandMessage(MessageInfo info, CommandMessage message) {
+        if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) {
+            LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
+            return;
+        }
+
+        if (message.hasClearCommand()) {
+            handleClearCommand(message.getClearCommand().getOffset());
+        } else {
+            LOG.warn("Unsupported command {}", message);
+        }
+    }
+
+    private void handleClearCommand(long offset) {
+        if (editable) {
+            updateClearOffsetIfLarger(offset);
+            LOG.info("Handled clear command for offset {}", offset);
+        } else {
+            LOG.warn("Unexpected ClearCommand for non editable subscriber");
+        }
+
+    }
+
+    private long updateClearOffsetIfLarger(long offset) {
+        return clearOffset.accumulateAndGet(offset, Math::max);
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(commandPoller);
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..8ac9e56 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -18,27 +18,19 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
 import static java.util.Arrays.asList;
-import static java.util.Collections.singletonMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
-import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
 
 import java.io.Closeable;
-import java.io.InputStream;
 import java.util.Collections;
 import java.util.Dictionary;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -46,43 +38,40 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
 import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
-import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.log.spi.DistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.serviceusermapping.ServiceUserMapped;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -90,44 +79,28 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.Reset;
+import com.google.protobuf.GeneratedMessage;
 
 /**
- * A Subscriber SCD agent which consumes messages produced by a {@code DistributionPublisher} agent.
+ * A Subscriber SCD agent which consumes messages produced by a
+ * {@code DistributionPublisher} agent.
  */
-@Component(
-        service = {},
-        immediate = true,
-        property = {"announceDelay=10000"},
-        configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+@Component(service = {}, immediate = true, property = {
+        "announceDelay=10000" }, configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
 @Designate(ocd = SubscriberConfiguration.class, factory = true)
 @ParametersAreNonnullByDefault
 public class DistributionSubscriber implements DistributionAgent {
     private static final int PRECONDITION_TIMEOUT = 60;
-    private static final int RETRY_SEND_DELAY = 1000;
     static int RETRY_DELAY = 5000;
     static int QUEUE_FETCH_DELAY = 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
-    
+
     private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
 
     @Reference(name = "packageBuilder")
@@ -147,7 +120,7 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Reference
     private EventAdmin eventAdmin;
-    
+
     @Reference
     private JournalAvailable journalAvailable;
 
@@ -156,61 +129,37 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Reference
     private DistributionMetricsService distributionMetricsService;
-    
-    @Reference
-    private ServiceUserMapped mappedUser;
 
     @Reference
     private Packaging packaging;
-    
-    private ServiceRegistration<DistributionAgent> componentReg;
-
-    private final PackageRetries packageRetries = new PackageRetries();
 
-    private GaugeService<Integer> retriesGauge;
+    private ServiceRegistration<DistributionAgent> componentReg;
 
     private Closeable packagePoller;
 
-    private Closeable commandPoller;
-
-    private LocalStore processedOffsets;
-
-    private LocalStore processedStatuses;
+    private CommandPoller commandPoller;
 
+    private BookKeeper bookKeeper;
 
-    private final AtomicLong clearOffset = new AtomicLong(-1);
-
-    // Use a bounded internal buffer to allow reading further packages while working on one at a time
+    // Use a bounded internal buffer to allow reading further packages while working
+    // on one at a time
     private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8);
 
     private Set<String> queueNames = Collections.emptySet();
 
-    private MessageSender<PackageStatusMessage> sender;
-
     private Announcer announcer;
 
     private String subAgentName;
 
-    private String subSlingId;
-
     private String pkgType;
 
-    private int maxRetries;
-
-    private boolean errorQueueEnabled;
-
-    private boolean editable;
-
     private volatile boolean running = true;
 
     private volatile Thread queueProcessor;
-    
-    private ContentPackageExtractor extractor;
-    
+
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
-
-        subSlingId = requireNonNull(slingSettings.getSlingId());
+        String subSlingId = requireNonNull(slingSettings.getSlingId());
         subAgentName = requireNonNull(config.name());
         requireNonNull(config);
         requireNonNull(context);
@@ -223,103 +172,46 @@ public class DistributionSubscriber implements DistributionAgent {
         requireNonNull(precondition);
 
         queueNames = getNotEmpty(config.agentNames());
+        int maxRetries = config.maxRetries();
+        boolean editable = config.editable();
 
-        maxRetries = config.maxRetries();
-        // Error queues are enabled when the number
-        // of retry attempts is limited ; disabled otherwise
-        errorQueueEnabled = (maxRetries >= 0);
-
-        editable = config.editable();
-
-        // The offset store is identified by the agentName only.
-        //
-        // With non clustered publish instances deployment, each
-        // instance stores the offset in its own node store, thus
-        // avoiding mix ups. Moreover, when cloning an instance
-        // from a node store, the cloned instance will implicitly
-        // recover the offsets and start from the last processed
-        // offset.
-        //
-        // With clustered publish instances deployment, only one
-        // Subscriber agent must run on the cluster in order to
-        // avoid mix ups.
-        //
-        // The clustered and non clustered publish instances use
-        // cases can be supported by only running the Subscriber
-        // agent on the leader instance.
-        processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
-        long startOffset = processedOffsets.load("offset", -1L) + 1;
-
-        processedStatuses = new LocalStore(resolverFactory, "statuses", subAgentName);
-
+        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
+                sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
+        
+        long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
 
-        packagePoller = messagingProvider.createPoller(
-                topics.getPackageTopic(), 
-                Reset.earliest, 
-                assign,
+        packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
                 create(PackageMessage.class, this::handlePackageMessage));
 
-        if (editable) {
-
-            /*
-             * We currently only support commands requiring editable mode.
-             * As an optimisation, we don't register a poller for non
-             * editable subscribers.
-             *
-             * When supporting commands independent from editable mode,
-             * this optimisation will be removed.
-             */
-
-            commandPoller = messagingProvider.createPoller(
-                    topics.getCommandTopic(),
-                    Reset.earliest,
-                    create(CommandMessage.class, this::handleCommandMessage));
-        }
-
+        commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, editable);
 
         queueProcessor = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", subAgentName));
 
-        sender = messagingProvider.createSender();
-        
-        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.name();
-        retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
-
         int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
-        MessageSender<DiscoveryMessage> disSender = messagingProvider.createSender();
-        announcer = new Announcer(subSlingId,
-                subAgentName,
-                topics.getDiscoveryTopic(),
-                queueNames,
-                disSender,
-                processedOffsets,
-                packageRetries,
-                maxRetries,
-                config.editable(),
-                announceDelay
-                );
+        announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper,
+                maxRetries, config.editable(), announceDelay);
 
         pkgType = requireNonNull(packageBuilder.getType());
-
-        String msg = format("Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
-                subAgentName,
-                startOffset,
-                queueNames,
-                pkgType,
-                config.editable(),
-                maxRetries,
-                errorQueueEnabled);
+        boolean errorQueueEnabled = (maxRetries >= 0);
+        String msg = format(
+                "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
+                subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled);
         LOG.info(msg);
         Dictionary<String, Object> props = createServiceProps(config);
         componentReg = context.registerService(DistributionAgent.class, this, props);
-        extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+    }
+
+    private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
+        MessageSender<T> sender = messagingProvider.createSender();
+        return msg -> sender.send(topic, msg);
     }
 
     private Set<String> getNotEmpty(String[] agentNames) {
-        return asList(agentNames).stream()
-                .filter(StringUtils::isNotBlank)
-                .collect(toSet());
+        return asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(toSet());
     }
 
     private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
@@ -338,8 +230,8 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Deactivate
     public void deactivate() {
-        IOUtils.closeQuietly(retriesGauge);
         IOUtils.closeQuietly(announcer);
+        IOUtils.closeQuietly(bookKeeper);
         componentReg.unregister();
         IOUtils.closeQuietly(packagePoller);
         IOUtils.closeQuietly(commandPoller);
@@ -348,10 +240,9 @@ public class DistributionSubscriber implements DistributionAgent {
         if (interrupter != null) {
             interrupter.interrupt();
         }
-        String msg = String.format("Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
-                subAgentName,
-                queueNames,
-                pkgType);
+        String msg = String.format(
+                "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
+                subAgentName, queueNames, pkgType);
         LOG.info(msg);
     }
 
@@ -363,9 +254,9 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Override
     public DistributionQueue getQueue(@Nonnull String queueName) {
-        DistributionQueueItem head = queueItemsBuffer.stream()
-                .filter(item -> isIn(queueName, item)).findFirst().orElse(null);
-        return new SubQueue(queueName, head, packageRetries);
+        DistributionQueueItem head = queueItemsBuffer.stream().filter(item -> isIn(queueName, item)).findFirst()
+                .orElse(null);
+        return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
     }
 
     private boolean isIn(String queueName, DistributionQueueItem queueItem) {
@@ -378,7 +269,7 @@ public class DistributionSubscriber implements DistributionAgent {
     public DistributionLog getLog() {
         return this::emptyDistributionLog;
     }
-    
+
     private List<String> emptyDistributionLog() {
         return Collections.emptyList();
     }
@@ -391,8 +282,7 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Nonnull
     @Override
-    public DistributionResponse execute(ResourceResolver resourceResolver,
-                                        DistributionRequest request) {
+    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
         return executeUnsupported(request);
     }
 
@@ -405,43 +295,43 @@ public class DistributionSubscriber implements DistributionAgent {
     }
 
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
-        if (! queueNames.contains(message.getPubAgentName())) {
+        if (!queueNames.contains(message.getPubAgentName())) {
             LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
             return;
         }
-        if (! pkgType.equals(message.getPkgType())) {
+        if (!pkgType.equals(message.getPkgType())) {
             LOG.warn(String.format("Skipping package with type %s", message.getPkgType()));
             return;
         }
         DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
-        try {
-            enqueue(queueItem);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException();
-        }
+        enqueue(queueItem);
     }
 
     /**
-     * We block here if the buffer is full in order to limit the number of
-	 * binary packages fetched in memory. Note that each queued item contains
-	 * the binary package to be imported.
+     * We block here if the buffer is full in order to limit the number of binary
+     * packages fetched in memory. Note that each queued item contains the binary
+     * package to be imported.
      */
-	private void enqueue(DistributionQueueItem queueItem) throws InterruptedException {
-        while (running) {
-            if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
-                distributionMetricsService.getItemsBufferSize().increment();
-                return;
+    private void enqueue(DistributionQueueItem queueItem) {
+        try {
+            while (running) {
+                if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
+                    distributionMetricsService.getItemsBufferSize().increment();
+                    return;
+                }
             }
+            throw new InterruptedException();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException();
         }
-        throw new InterruptedException();
-	}
+    }
 
     private void processQueue() {
         LOG.info("Started Queue processor");
-        while (! Thread.interrupted()) {
+        while (!Thread.interrupted()) {
             try {
-                processQueueItems();
+                fetchAndProcessQueueItem();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
@@ -449,18 +339,22 @@ public class DistributionSubscriber implements DistributionAgent {
         LOG.info("Stopped Queue processor");
     }
 
-    private void processQueueItems() throws InterruptedException {
+    private void fetchAndProcessQueueItem() throws InterruptedException {
         try {
             // send status stored in a previous run if exists
-            try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
-                sendStoredStatus();
-            }
+            bookKeeper.sendStoredStatus();
             // block until an item is available
             DistributionQueueItem item = blockingPeekQueueItem();
             // and then process it
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
                 processQueueItem(item);
             }
+        } catch (IllegalStateException e) {
+            /**
+             * Precondition timed out. We only log this on info level as it is no error
+             */
+            LOG.info(e.getMessage());
+            Thread.sleep(RETRY_DELAY);
         } catch (InterruptedException e) {
             throw e;
         } catch (Throwable t) {
@@ -483,264 +377,20 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private void processQueueItem(DistributionQueueItem queueItem) throws Exception {
         long offset = queueItem.get(RECORD_OFFSET, Long.class);
-        boolean skip;
-        try {
-            skip = isCleared(offset) || cannotProcess(offset);
-        } catch (IllegalStateException e) {
-            /**
-             * This will occur when the precondition times out.
-             */
-            LOG.info(e.getMessage());
-            Thread.sleep(RETRY_DELAY);
-            return;
-        }
         PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
-        String pubAgentName = pkgMsg.getPubAgentName();
+        boolean skip = shouldSkip(offset);
         if (skip) {
-            removePackage(pkgMsg, offset);
+            bookKeeper.removePackage(pkgMsg, offset);
         } else {
             long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
-            importPackage(pkgMsg, offset, createdTime);
-        }
-        queueItemProcessed(pubAgentName);
-    }
-
-    private void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
-        LOG.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
-        Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
-        try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
-            if (editable) {
-                storeStatus(resolver, REMOVED, offset, pkgMsg.getPubAgentName());
-            }
-            storeOffset(resolver, offset);
-            resolver.commit();
-            context.stop();
-        }
-    }
-
-    private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws Exception {
-        LOG.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
-        Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
-        try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
-            storeStatus(resolver, REMOVED_FAILED, offset, pkgMsg.getPubAgentName());
-            storeOffset(resolver, offset);
-            resolver.commit();
-            context.stop();
+            bookKeeper.importPackage(pkgMsg, offset, createdTime);
         }
-    }
-
-    private void importPackage(PackageMessage pkgMsg, long offset, long createdTime)
-            throws Exception {
-        String pubAgentName = pkgMsg.getPubAgentName();
-        LOG.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
-        addPackageMDC(pkgMsg);
-        Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
-        try (ResourceResolver importerResolver = getServiceResolver("importer")) {
-
-            /*
-             * We aim at processing the packages exactly once.
-             * Processing the packages exactly once is possible
-             * with the following conditions
-             *
-             * I.  The package importer is configured to disable
-             *     auto-committing changes.
-             *
-             * II. A single commit aggregates three content updates
-             *
-             *     C1. install the package
-             *     C2. store the processing status
-             *     C3. store the offset processed
-             *
-             * Some package importers require auto-saving or issue
-             * partial commits before failing.
-             * For those packages importers, we aim at processing
-             * packages at least once, thanks to the order in which
-             * the content updates are applied.
-             */
-
-            installPackage(importerResolver, pkgMsg);
-            if (editable) {
-                storeStatus(importerResolver, IMPORTED, offset, pubAgentName);
-            }
-            storeOffset(importerResolver, offset);
-            importerResolver.commit();
-
-            context.stop();
-            distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
-            distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
-
-            Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
-            eventAdmin.postEvent(event);
-        } catch (Throwable e) {
-            distributionMetricsService.getFailedPackageImports().mark();
-            // rethrow fatal exceptions
-            if (e instanceof Error) {
-                throw (Error) e;
-            }
-            int retries = packageRetries.get(pubAgentName);
-            if (errorQueueEnabled && retries >= maxRetries) {
-                LOG.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
-                removeFailedPackage(pkgMsg, offset);
-            } else {
-                packageRetries.increase(pubAgentName);
-                String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
-                throw new DistributionException(msg, e);
-            }
-        } finally {
-            MDC.clear();
-        }
-    }
-
-    private void storeOffset(ResourceResolver importerResolver, long offset)
-            throws PersistenceException {
-        processedOffsets.store(importerResolver, "offset", offset);
-    }
-
-    private void queueItemProcessed(String pubAgentName) {
-        packageRetries.clear(pubAgentName);
         queueItemsBuffer.remove();
         distributionMetricsService.getItemsBufferSize().decrement();
     }
 
-    private void addPackageMDC(PackageMessage pkgMsg) {
-        MDC.put("module", "distribution");
-        MDC.put("package-id", pkgMsg.getPkgId());
-        String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
-        MDC.put("paths", paths);
-        MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
-        String pubAgentName = pkgMsg.getPubAgentName();
-        MDC.put("pub-agent-name", pubAgentName);
-        MDC.put("distribution-message-type", pkgMsg.getReqType().name());
-        MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
-        MDC.put("sub-sling-id", subSlingId);
-        MDC.put("sub-agent-name", subAgentName);
-    }
-
-
-    private void installPackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws DistributionException, PersistenceException {
-        PackageMessage.ReqType type = pkgMsg.getReqType();
-        switch (type) {
-            case ADD:
-                installAddPackage(resolver, pkgMsg);
-                break;
-            case DELETE:
-                installDeletePackage(resolver, pkgMsg);
-                break;
-            case TEST:
-                break;
-            default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
-        }
-    }
-
-    private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws DistributionException {
-        LOG.info("Importing paths " + pkgMsg.getPathsList());
-        InputStream pkgStream = null;
-        try {
-            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
-            packageBuilder.installPackage(resolver, pkgStream);
-            extractor.handle(resolver, pkgMsg.getPathsList());
-        } finally {
-            IOUtils.closeQuietly(pkgStream);
-        }
-
-    }
-
-    private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws PersistenceException {
-        LOG.info("Deleting paths " + pkgMsg.getPathsList());
-        for (String path : pkgMsg.getPathsList()) {
-            Resource resource = resolver.getResource(path);
-            if (resource != null) {
-                resolver.delete(resource);
-            }
-        }
-    }
-
-    private void storeStatus(ResourceResolver resolver, Status status, long offset, String pubAgentName) throws PersistenceException {
-        Map<String, Object> s = new HashMap<>();
-        s.put("pubAgentName", pubAgentName);
-        s.put("statusNumber", status.getNumber());
-        s.put("offset", offset);
-        s.put("sent", false);
-        processedStatuses.store(resolver, s);
-        LOG.info("Stored status {}", s);
-    }
-
-    private void sendStoredStatus() throws InterruptedException {
-        ValueMap status = processedStatuses.load();
-        boolean sent = status.get("sent", true);
-        for (int retry = 0 ; !sent ; retry++) {
-            try {
-                sendStatusMessage(status);
-                markStatusSent();
-                sent = true;
-            } catch (Exception e) {
-                LOG.warn("Cannot send status (retry {})", retry, e);
-                Thread.sleep(RETRY_SEND_DELAY);
-            }
-        }
-    }
-
-    private void markStatusSent() {
-        try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
-            processedStatuses.store(resolver, "sent", true);
-            resolver.commit();
-        } catch (Exception e) {
-            LOG.warn("Failed to mark status as sent", e);
-        }
-    }
-
-    private void sendStatusMessage(ValueMap status) {
-
-        PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
-                .setSubSlingId(subSlingId)
-                .setSubAgentName(subAgentName)
-                .setPubAgentName(status.get("pubAgentName", String.class))
-                .setOffset(status.get("offset", Long.class))
-                .setStatus(Status.valueOf(status.get("statusNumber", Integer.class)))
-                .build();
-
-        sender.send(topics.getStatusTopic(), pkgStatMsg);
-        LOG.info("Sent status message {}", status);
-    }
-
-    private void handleCommandMessage(MessageInfo info, CommandMessage message) {
-        if (subSlingId.equals(message.getSubSlingId()) && subAgentName.equals(message.getSubAgentName())) {
-            if (message.hasClearCommand()) {
-                handleClearCommand(message.getClearCommand().getOffset());
-            } else {
-                LOG.warn("Unsupported command {}", message);
-            }
-        } else {
-            LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
-        }
-    }
-
-    private boolean isCleared(long offset) {
-        return offset <= clearOffset.longValue();
-    }
-
-    private boolean cannotProcess(long offset) {
-	    return !precondition.canProcess(offset , PRECONDITION_TIMEOUT);
-    }
-
-    private void handleClearCommand(long offset) {
-        if (editable) {
-            // atomically compare and set clearOffset
-            // as the max between the provided offset
-            // and the current clearOffset
-            clearOffset.accumulateAndGet(offset, Math::max);
-            LOG.info("Handled clear command for offset {}", offset);
-        } else {
-            LOG.warn("Unexpected ClearCommand for non editable subscriber");
-        }
-
-    }
-
-    private ResourceResolver getServiceResolver(String subService) throws LoginException {
-        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
+    private boolean shouldSkip(long offset) throws IllegalStateException {
+        return commandPoller.isCleared(offset) || !precondition.canProcess(offset, PRECONDITION_TIMEOUT);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
new file mode 100644
index 0000000..7307fdc
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PackageHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
+    
+    private DistributionPackageBuilder packageBuilder;
+    
+    private ContentPackageExtractor extractor;
+
+    public PackageHandler(DistributionPackageBuilder packageBuilder, ContentPackageExtractor extractor) {
+        this.packageBuilder = packageBuilder;
+        this.extractor = extractor;
+    }
+
+    public void apply(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws DistributionException, PersistenceException {
+        PackageMessage.ReqType type = pkgMsg.getReqType();
+        switch (type) {
+            case ADD:
+                installAddPackage(resolver, pkgMsg);
+                break;
+            case DELETE:
+                installDeletePackage(resolver, pkgMsg);
+                break;
+            case TEST:
+                break;
+            default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
+        }
+    }
+
+    private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws DistributionException {
+        LOG.info("Importing paths " + pkgMsg.getPathsList());
+        InputStream pkgStream = null;
+        try {
+            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+            packageBuilder.installPackage(resolver, pkgStream);
+            extractor.handle(resolver, pkgMsg.getPathsList());
+        } finally {
+            IOUtils.closeQuietly(pkgStream);
+        }
+
+    }
+
+    private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws PersistenceException {
+        LOG.info("Deleting paths " + pkgMsg.getPathsList());
+        for (String path : pkgMsg.getPathsList()) {
+            Resource resource = resolver.getResource(path);
+            if (resource != null) {
+                resolver.delete(resource);
+            }
+        }
+    }
+    
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index e9ef03b..05f8a88 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -25,16 +25,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
+import java.util.function.Consumer;
 
 import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.MessageSender;
-
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-
 public class AnnouncerTest {
 
     private static final String SUB1_SLING_ID = "sub1sling";
@@ -44,14 +41,13 @@ public class AnnouncerTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testDiscoveryMessage() throws InterruptedException {
-        MessageSender<Messages.DiscoveryMessage> sender = Mockito.mock(MessageSender.class);
-        LocalStore offsetStore = Mockito.mock(LocalStore.class);
-        PackageRetries packageRetries = Mockito.mock(PackageRetries.class);
-        when(offsetStore.load("offset", -1L)).thenReturn(1l);
-        Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, "discoverytopic", Collections.singleton(PUB1_AGENT_NAME), sender, offsetStore, packageRetries, -1, false, 10000);
+        Consumer<Messages.DiscoveryMessage> sender = Mockito.mock(Consumer.class);
+        BookKeeper bookKeeper = Mockito.mock(BookKeeper.class);
+        when(bookKeeper.loadOffset()).thenReturn(1l);
+        Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, Collections.singleton(PUB1_AGENT_NAME), sender, bookKeeper, -1, false, 10000);
         Thread.sleep(200);
         ArgumentCaptor<Messages.DiscoveryMessage> msg = forClass(Messages.DiscoveryMessage.class);
-        verify(sender).send(Mockito.eq("discoverytopic"), msg.capture());
+        verify(sender).accept(msg.capture());
         Messages.DiscoveryMessage message = msg.getValue();
         Messages.SubscriberState offset = message.getSubscriberStateList().iterator().next();
         assertThat(message.getSubSlingId(), equalTo(SUB1_SLING_ID));