You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/03/18 20:28:49 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-10585 - Add support for invalidation requests (#101)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e069bc  SLING-10585 - Add support for invalidation requests (#101)
3e069bc is described below

commit 3e069bca0f7857f28222032eae3b16eff9b9300c
Author: balasoiuroxana <99...@users.noreply.github.com>
AuthorDate: Fri Mar 18 21:28:44 2022 +0100

    SLING-10585 - Add support for invalidation requests (#101)
    
    Co-authored-by: balasoiu <ba...@adobe.com>
---
 pom.xml                                            |  4 +-
 .../{ImportedEvent.java => AppliedEvent.java}      |  4 +-
 .../journal/bookkeeper/BookKeeper.java             | 49 +++++++++++++++++++---
 .../journal/bookkeeper/BookKeeperFactory.java      |  7 +++-
 .../impl/precondition/PackageStatusWatcher.java    |  2 +-
 .../impl/publisher/DistributionPublisher.java      | 11 +++--
 .../impl/publisher/PackageMessageFactory.java      | 17 +++++++-
 .../impl/subscriber/DistributionSubscriber.java    |  9 +++-
 .../journal/queue/QueueItemFactory.java            |  2 +
 .../journal/shared/DistributionMetricsService.java | 21 ++++++++++
 .../journal/shared/NoOpInvalidationProcessor.java  | 35 ++++++++++++++++
 .../journal/bookkeeper/BookKeeperTest.java         | 27 ++++++++++--
 .../publisher/DistributionPackageFactoryTest.java  | 12 +++++-
 .../impl/publisher/DistributionPublisherTest.java  |  6 +++
 .../distribution/journal/msg/InMemoryProvider.java |  5 +++
 15 files changed, 186 insertions(+), 25 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7028285..c68a665 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.api</artifactId>
-            <version>0.6.0</version>
+            <version>0.6.1-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
@@ -146,7 +146,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.3.0</version>
+            <version>0.4.1-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ImportedEvent.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/AppliedEvent.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/bookkeeper/ImportedEvent.java
rename to src/main/java/org/apache/sling/distribution/journal/bookkeeper/AppliedEvent.java
index 120e0e5..8d35d34 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ImportedEvent.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/AppliedEvent.java
@@ -33,14 +33,14 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.osgi.service.event.Event;
 
 @ParametersAreNonnullByDefault
-class ImportedEvent {
+class AppliedEvent {
 
     public static final String PACKAGE_ID = "distribution.package.id";
     private static final String KIND_IMPORTER = "importer";
     private PackageMessage pkgMsg;
     private String agentName;
 
-    ImportedEvent(PackageMessage pkgMsg, String agentName) {
+    AppliedEvent(PackageMessage pkgMsg, String agentName) {
         this.pkgMsg = pkgMsg;
         this.agentName = agentName;
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 0d9a696..c5c6b9b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -44,6 +44,8 @@ import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.ImportPostProcessException;
 import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.InvalidationProcessException;
+import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -52,6 +54,7 @@ import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Statu
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
+import org.apache.sling.distribution.journal.shared.NoOpInvalidationProcessor;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -101,18 +104,19 @@ public class BookKeeper implements Closeable {
     private final LocalStore processedOffsets;
     private final GaugeService<Integer> retriesGauge;
     private final ImportPostProcessor importPostProcessor;
+    private final InvalidationProcessor invalidationProcessor;
     private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService,
         PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
         BookKeeperConfig config) {
         this(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
-            logSender, config, new NoOpImportPostProcessor());
+            logSender, config, new NoOpImportPostProcessor(), new NoOpInvalidationProcessor());
     }
     
     public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService,
         PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
-        BookKeeperConfig config, ImportPostProcessor importPostProcessor) { 
+        BookKeeperConfig config, ImportPostProcessor importPostProcessor, InvalidationProcessor invalidationProcessor) {
         this.packageHandler = packageHandler;
         this.eventAdmin = eventAdmin;
         this.sender = sender;
@@ -128,6 +132,7 @@ public class BookKeeper implements Closeable {
         this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName());
         this.processedOffsets = new LocalStore(resolverFactory, config.getPackageNodeName(), config.getSubAgentName());
         this.importPostProcessor = importPostProcessor;
+        this.invalidationProcessor = invalidationProcessor;
         log.info("Started bookkeeper {}.", config);
     }
     
@@ -153,7 +158,7 @@ public class BookKeeper implements Closeable {
                 ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
             packageHandler.apply(importerResolver, pkgMsg);
             if (config.isEditable()) {
-                storeStatus(importerResolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
+                storeStatus(importerResolver, new PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
             }
             storeOffset(importerResolver, offset);
             importerResolver.commit();
@@ -165,14 +170,48 @@ public class BookKeeper implements Closeable {
 
             packageRetries.clear(pkgMsg.getPubAgentName());
              
-            Event event = new ImportedEvent(pkgMsg, config.getSubAgentName()).toEvent();
+            Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
             log.info("Imported distribution package {} at offset={}", pkgMsg, offset);
         } catch (DistributionException | LoginException | IOException | RuntimeException | ImportPostProcessException e) {
             failure(pkgMsg, offset, e);
         }
     }
-    
+
+    public void invalidateCache(PackageMessage pkgMsg, long offset) throws DistributionException {
+        log.debug("Invalidating the cache for the package {} at offset={}", pkgMsg, offset);
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            Map<String, Object> props = new HashMap<>();
+            props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
+            props.put(DISTRIBUTION_PATHS, pkgMsg.getPaths());
+            props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId());
+
+            long invalidationStartTime = currentTimeMillis();
+            distributionMetricsService.getInvalidationProcessRequest().increment();
+
+            invalidationProcessor.process(props);
+
+            if (config.isEditable()) {
+                storeStatus(resolver, new PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
+            }
+
+            storeOffset(resolver, offset);
+            resolver.commit();
+
+            packageRetries.clear(pkgMsg.getPubAgentName());
+
+            Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent();
+            eventAdmin.postEvent(event);
+
+            log.info("Invalidated the cache for the package {} at offset={}", pkgMsg, offset);
+
+            distributionMetricsService.getInvalidationProcessDuration().update((currentTimeMillis() - invalidationStartTime), TimeUnit.MILLISECONDS);
+            distributionMetricsService.getInvalidationProcessSuccess().increment();
+        } catch (LoginException | PersistenceException | InvalidationProcessException e) {
+            failure(pkgMsg, offset, e);
+        }
+    }
+
     private void postProcess(PackageMessage pkgMsg) throws ImportPostProcessException {
         log.debug("Executing import post processor for package [{}]", pkgMsg);
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index 3d1d449..b6a159b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
 import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.BinaryStore;
@@ -52,6 +53,9 @@ public class BookKeeperFactory {
     @Reference
     ImportPostProcessor importPostProcessor;
 
+    @Reference
+    InvalidationProcessor invalidationProcessor;
+
     public BookKeeper create(
             DistributionPackageBuilder packageBuilder, 
             BookKeeperConfig config, 
@@ -68,7 +72,8 @@ public class BookKeeperFactory {
                 statusSender,
                 logSender,
                 config,
-                importPostProcessor);
+                importPostProcessor,
+                invalidationProcessor);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 619641c..fa202fe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -98,4 +98,4 @@ public class PackageStatusWatcher implements Closeable {
         Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName());
         agentStatus.put(statusOffset, pkgStatusMsg.getStatus());
     }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index f76e20d..94afec6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -21,9 +21,7 @@ package org.apache.sling.distribution.journal.impl.publisher;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
-import static org.apache.sling.distribution.DistributionRequestType.ADD;
-import static org.apache.sling.distribution.DistributionRequestType.DELETE;
-import static org.apache.sling.distribution.DistributionRequestType.TEST;
+import static org.apache.sling.distribution.DistributionRequestType.*;
 import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
 import static org.apache.sling.distribution.journal.shared.Strings.requireNotBlank;
 
@@ -145,9 +143,10 @@ public class DistributionPublisher implements DistributionAgent {
 
     public DistributionPublisher() {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
-        reqTypes.put(ADD,    this::sendAndWait);
-        reqTypes.put(DELETE, this::sendAndWait);
-        reqTypes.put(TEST,   this::send);
+        reqTypes.put(ADD,        this::sendAndWait);
+        reqTypes.put(DELETE,     this::sendAndWait);
+        reqTypes.put(INVALIDATE, this::sendAndWait);
+        reqTypes.put(TEST,       this::send);
     }
 
     @Activate
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index 051e037..ed95991 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -91,6 +91,7 @@ public class PackageMessageFactory {
         switch (request.getRequestType()) {
             case ADD: return createAdd(packageBuilder, resourceResolver, pubAgentName, request);
             case DELETE: return createDelete(packageBuilder, resourceResolver, request, pubAgentName);
+            case INVALIDATE: return createInvalidate(packageBuilder, resourceResolver, request, pubAgentName);
             case TEST: return createTest(packageBuilder, resourceResolver, pubAgentName);
             default: throw new IllegalArgumentException(format("Unsupported request with requestType=%s", request.getRequestType()));
         }
@@ -133,7 +134,21 @@ public class PackageMessageFactory {
         disPkg.delete();
         return pipePackage;
     }
-    
+
+    @Nonnull
+    private PackageMessage createInvalidate(DistributionPackageBuilder packageBuilder, ResourceResolver resourceResolver, DistributionRequest request, String pubAgentName) {
+        String pkgId = UUID.randomUUID().toString();
+        return PackageMessage.builder()
+                .pubSlingId(pubSlingId)
+                .pkgId(pkgId)
+                .pubAgentName(pubAgentName)
+                .paths(Arrays.asList(request.getPaths()))
+                .reqType(ReqType.INVALIDATE)
+                .pkgType(packageBuilder.getType())
+                .userId(resourceResolver.getUserID())
+                .build();
+    }
+
     @Nonnull
     private PackageMessage createDelete(DistributionPackageBuilder packageBuilder, ResourceResolver resourceResolver, DistributionRequest request, String pubAgentName) {
         String pkgId = UUID.randomUUID().toString();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9460068..5c4acfd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -24,6 +24,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.messages.PackageMessage.ReqType.INVALIDATE;
 import static org.apache.sling.distribution.journal.shared.Delay.exponential;
 import static org.apache.sling.distribution.journal.shared.Strings.requireNotBlank;
 
@@ -51,6 +52,7 @@ import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.ImportPostProcessException;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.FullMessage;
@@ -320,7 +322,7 @@ public class DistributionSubscriber {
         LOG.info("Stopped Queue processor");
     }
 
-    private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException {
+    private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException, ImportPostProcessException {
         blockingSendStoredStatus();
         FullMessage<PackageMessage> item = blockingPeekQueueItem();
         try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -359,14 +361,17 @@ public class DistributionSubscriber {
         throw new InterruptedException("Shutting down");
     }
 
-    private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException {
+    private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
         MessageInfo info = item.getInfo();
         PackageMessage pkgMsg = item.getMessage();
         boolean skip = shouldSkip(info.getOffset());
+        PackageMessage.ReqType type = pkgMsg.getReqType();
         try {
             idleCheck.busy(bookKeeper.getRetries(pkgMsg.getPubAgentName()));
             if (skip) {
                 bookKeeper.removePackage(pkgMsg, info.getOffset());
+            } else if (type == INVALIDATE) {
+                bookKeeper.invalidateCache(pkgMsg, info.getOffset());
             } else {
                 bookKeeper.importPackage(pkgMsg, info.getOffset(), info.getCreateTime());
             }
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
index a026efb..3cf32ef 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
@@ -93,6 +93,8 @@ public final class QueueItemFactory {
             return DistributionRequestType.ADD;
         case DELETE:
             return DistributionRequestType.DELETE;
+        case INVALIDATE:
+            return DistributionRequestType.INVALIDATE;
         case TEST:
             return DistributionRequestType.TEST;
         default:
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index bda5e89..de68952 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -97,6 +97,12 @@ public class DistributionMetricsService {
 
     private Counter importPostProcessRequest;
 
+    private Timer invalidationProcessDuration;
+
+    private Counter invalidationProcessSuccess;
+
+    private Counter invalidationProcessRequest;
+
     private BundleContext context;
 
     @Activate
@@ -124,6 +130,9 @@ public class DistributionMetricsService {
         importPostProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "import_post_process_duration"));
         importPostProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_success_count"));
         importPostProcessRequest = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_request_count"));
+        invalidationProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "invalidation_process_duration"));
+        invalidationProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_success_count"));
+        invalidationProcessRequest = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_request_count"));
     }
 
     /**
@@ -371,6 +380,18 @@ public class DistributionMetricsService {
         return importPostProcessRequest;
     }
 
+    public Timer getInvalidationProcessDuration() {
+        return invalidationProcessDuration;
+    }
+
+    public Counter getInvalidationProcessSuccess() {
+        return invalidationProcessSuccess;
+    }
+
+    public Counter getInvalidationProcessRequest() {
+        return invalidationProcessRequest;
+    }
+
     public class GaugeService<T> implements Gauge<T>, Closeable {
         
         @SuppressWarnings("rawtypes")
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/NoOpInvalidationProcessor.java b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpInvalidationProcessor.java
new file mode 100644
index 0000000..6d513cc
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpInvalidationProcessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import org.apache.sling.distribution.InvalidationProcessException;
+import org.apache.sling.distribution.InvalidationProcessor;
+import org.osgi.service.component.annotations.Component;
+
+import java.util.Map;
+
+@Component(
+    property = {
+        "type=default"
+    }
+)
+public class NoOpInvalidationProcessor implements InvalidationProcessor {
+        @Override
+        public void process(Map<String, Object> props) throws InvalidationProcessException {}
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 5bf9d6e..7174b12 100644
--- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -36,6 +36,7 @@ import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -82,6 +83,9 @@ public class BookKeeperTest {
     @Mock
     private ImportPostProcessor importPostProcessor;
 
+    @Mock
+    private InvalidationProcessor invalidationProcessor;
+
     @Before
     public void before() {
         when(distributionMetricsService.getFailedPackageImports())
@@ -98,9 +102,15 @@ public class BookKeeperTest {
                 .thenReturn(mock(Timer.class));
         when(distributionMetricsService.getImportPostProcessSuccess())
                 .thenReturn(mock(Counter.class));
+        when(distributionMetricsService.getInvalidationProcessRequest())
+                .thenReturn(mock(Counter.class));
+        when(distributionMetricsService.getInvalidationProcessDuration())
+                .thenReturn(mock(Timer.class));
+        when(distributionMetricsService.getInvalidationProcessSuccess())
+                .thenReturn(mock(Counter.class));
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package");
         bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, logSender, bkConfig,
-            importPostProcessor);
+            importPostProcessor, invalidationProcessor);
     }
 
     @Test
@@ -122,20 +132,29 @@ public class BookKeeperTest {
     @Test
     public void testPackageImport() throws DistributionException {
         try {
-            bookKeeper.importPackage(buildPackageMessage(), 10, currentTimeMillis());
+            bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, currentTimeMillis());
+        } finally {
+            assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
+        }
+    }
+
+    @Test
+    public void testCacheInvalidation() throws DistributionException {
+        try {
+            bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE), 10);
         } finally {
             assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
         }
     }
 
-    PackageMessage buildPackageMessage() {
+    PackageMessage buildPackageMessage(PackageMessage.ReqType reqType) {
         PackageMessage msg = mock(PackageMessage.class);
         when(msg.getPkgLength())
                 .thenReturn(100L);
         when(msg.getPubAgentName())
                 .thenReturn(PUB_AGENT_NAME);
         when(msg.getReqType())
-                .thenReturn(PackageMessage.ReqType.ADD);
+                .thenReturn(reqType);
         when(msg.getPaths())
                 .thenReturn(singletonList("/content"));
         when(msg.getPkgId())
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
index 90f7f5c..02e568e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
@@ -149,6 +149,16 @@ public class DistributionPackageFactoryTest {
         assertThat(sent.getPkgBinary(), nullValue());
         assertThat(sent.getPkgLength(), equalTo(0L));
         assertThat(sent.getPaths(), contains("/test"));
-        
+    }
+
+    @Test
+    public void testInvalidate() throws DistributionException, IOException {
+        DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.INVALIDATE, "/test");
+
+        PackageMessage sent = publisher.create(packageBuilder, resourceResolver, "pub1agent1", request);
+        assertThat(sent.getReqType(), equalTo(ReqType.INVALIDATE));
+        assertThat(sent.getPkgBinary(), nullValue());
+        assertThat(sent.getPkgLength(), equalTo(0L));
+        assertThat(sent.getPaths(), contains("/test"));
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 4537f00..a7daea9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -185,6 +185,12 @@ public class DistributionPublisherTest {
     }
 
     @Test
+    public void executeRequestINVALIDATEAccepted() throws DistributionException, IOException {
+        DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.INVALIDATE, "/test");
+        executeAndCheck(request);
+    }
+
+    @Test
     public void executeRequestTESTAccepted() throws DistributionException, IOException {
         DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.TEST, "/test");
         executeAndCheck(request);
diff --git a/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java b/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java
index 5826a1b..adae72f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java
+++ b/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java
@@ -65,6 +65,11 @@ public class InMemoryProvider implements MessagingProvider {
     }
 
     @Override
+    public String assignTo(Reset reset, long relativeOffset) {
+        return String.format("%s", reset.name());
+    }
+
+    @Override
     public URI getServerUri() {
         try {
             return new URI("http://localhost:12345/dummy");