You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/03/18 20:28:49 UTC
[sling-org-apache-sling-distribution-journal] branch master updated: SLING-10585 - Add support for invalidation requests (#101)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 3e069bc SLING-10585 - Add support for invalidation requests (#101)
3e069bc is described below
commit 3e069bca0f7857f28222032eae3b16eff9b9300c
Author: balasoiuroxana <99...@users.noreply.github.com>
AuthorDate: Fri Mar 18 21:28:44 2022 +0100
SLING-10585 - Add support for invalidation requests (#101)
Co-authored-by: balasoiu <ba...@adobe.com>
---
pom.xml | 4 +-
.../{ImportedEvent.java => AppliedEvent.java} | 4 +-
.../journal/bookkeeper/BookKeeper.java | 49 +++++++++++++++++++---
.../journal/bookkeeper/BookKeeperFactory.java | 7 +++-
.../impl/precondition/PackageStatusWatcher.java | 2 +-
.../impl/publisher/DistributionPublisher.java | 11 +++--
.../impl/publisher/PackageMessageFactory.java | 17 +++++++-
.../impl/subscriber/DistributionSubscriber.java | 9 +++-
.../journal/queue/QueueItemFactory.java | 2 +
.../journal/shared/DistributionMetricsService.java | 21 ++++++++++
.../journal/shared/NoOpInvalidationProcessor.java | 35 ++++++++++++++++
.../journal/bookkeeper/BookKeeperTest.java | 27 ++++++++++--
.../publisher/DistributionPackageFactoryTest.java | 12 +++++-
.../impl/publisher/DistributionPublisherTest.java | 6 +++
.../distribution/journal/msg/InMemoryProvider.java | 5 +++
15 files changed, 186 insertions(+), 25 deletions(-)
diff --git a/pom.xml b/pom.xml
index 7028285..c68a665 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.api</artifactId>
- <version>0.6.0</version>
+ <version>0.6.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
@@ -146,7 +146,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
- <version>0.3.0</version>
+ <version>0.4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ImportedEvent.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/AppliedEvent.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/bookkeeper/ImportedEvent.java
rename to src/main/java/org/apache/sling/distribution/journal/bookkeeper/AppliedEvent.java
index 120e0e5..8d35d34 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ImportedEvent.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/AppliedEvent.java
@@ -33,14 +33,14 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.osgi.service.event.Event;
@ParametersAreNonnullByDefault
-class ImportedEvent {
+class AppliedEvent {
public static final String PACKAGE_ID = "distribution.package.id";
private static final String KIND_IMPORTER = "importer";
private PackageMessage pkgMsg;
private String agentName;
- ImportedEvent(PackageMessage pkgMsg, String agentName) {
+ AppliedEvent(PackageMessage pkgMsg, String agentName) {
this.pkgMsg = pkgMsg;
this.agentName = agentName;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 0d9a696..c5c6b9b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -44,6 +44,8 @@ import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.ImportPostProcessException;
import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.InvalidationProcessException;
+import org.apache.sling.distribution.InvalidationProcessor;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -52,6 +54,7 @@ import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Statu
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
+import org.apache.sling.distribution.journal.shared.NoOpInvalidationProcessor;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -101,18 +104,19 @@ public class BookKeeper implements Closeable {
private final LocalStore processedOffsets;
private final GaugeService<Integer> retriesGauge;
private final ImportPostProcessor importPostProcessor;
+ private final InvalidationProcessor invalidationProcessor;
private int skippedCounter = 0;
public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService,
PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
BookKeeperConfig config) {
this(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
- logSender, config, new NoOpImportPostProcessor());
+ logSender, config, new NoOpImportPostProcessor(), new NoOpInvalidationProcessor());
}
public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService,
PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
- BookKeeperConfig config, ImportPostProcessor importPostProcessor) {
+ BookKeeperConfig config, ImportPostProcessor importPostProcessor, InvalidationProcessor invalidationProcessor) {
this.packageHandler = packageHandler;
this.eventAdmin = eventAdmin;
this.sender = sender;
@@ -128,6 +132,7 @@ public class BookKeeper implements Closeable {
this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName());
this.processedOffsets = new LocalStore(resolverFactory, config.getPackageNodeName(), config.getSubAgentName());
this.importPostProcessor = importPostProcessor;
+ this.invalidationProcessor = invalidationProcessor;
log.info("Started bookkeeper {}.", config);
}
@@ -153,7 +158,7 @@ public class BookKeeper implements Closeable {
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
packageHandler.apply(importerResolver, pkgMsg);
if (config.isEditable()) {
- storeStatus(importerResolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
+ storeStatus(importerResolver, new PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
}
storeOffset(importerResolver, offset);
importerResolver.commit();
@@ -165,14 +170,48 @@ public class BookKeeper implements Closeable {
packageRetries.clear(pkgMsg.getPubAgentName());
- Event event = new ImportedEvent(pkgMsg, config.getSubAgentName()).toEvent();
+ Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent();
eventAdmin.postEvent(event);
log.info("Imported distribution package {} at offset={}", pkgMsg, offset);
} catch (DistributionException | LoginException | IOException | RuntimeException | ImportPostProcessException e) {
failure(pkgMsg, offset, e);
}
}
-
+
+ public void invalidateCache(PackageMessage pkgMsg, long offset) throws DistributionException {
+ log.debug("Invalidating the cache for the package {} at offset={}", pkgMsg, offset);
+ try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+ Map<String, Object> props = new HashMap<>();
+ props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
+ props.put(DISTRIBUTION_PATHS, pkgMsg.getPaths());
+ props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId());
+
+ long invalidationStartTime = currentTimeMillis();
+ distributionMetricsService.getInvalidationProcessRequest().increment();
+
+ invalidationProcessor.process(props);
+
+ if (config.isEditable()) {
+ storeStatus(resolver, new PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
+ }
+
+ storeOffset(resolver, offset);
+ resolver.commit();
+
+ packageRetries.clear(pkgMsg.getPubAgentName());
+
+ Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent();
+ eventAdmin.postEvent(event);
+
+ log.info("Invalidated the cache for the package {} at offset={}", pkgMsg, offset);
+
+ distributionMetricsService.getInvalidationProcessDuration().update((currentTimeMillis() - invalidationStartTime), TimeUnit.MILLISECONDS);
+ distributionMetricsService.getInvalidationProcessSuccess().increment();
+ } catch (LoginException | PersistenceException | InvalidationProcessException e) {
+ failure(pkgMsg, offset, e);
+ }
+ }
+
private void postProcess(PackageMessage pkgMsg) throws ImportPostProcessException {
log.debug("Executing import post processor for package [{}]", pkgMsg);
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index 3d1d449..b6a159b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.InvalidationProcessor;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.BinaryStore;
@@ -52,6 +53,9 @@ public class BookKeeperFactory {
@Reference
ImportPostProcessor importPostProcessor;
+ @Reference
+ InvalidationProcessor invalidationProcessor;
+
public BookKeeper create(
DistributionPackageBuilder packageBuilder,
BookKeeperConfig config,
@@ -68,7 +72,8 @@ public class BookKeeperFactory {
statusSender,
logSender,
config,
- importPostProcessor);
+ importPostProcessor,
+ invalidationProcessor);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 619641c..fa202fe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -98,4 +98,4 @@ public class PackageStatusWatcher implements Closeable {
Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName());
agentStatus.put(statusOffset, pkgStatusMsg.getStatus());
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index f76e20d..94afec6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -21,9 +21,7 @@ package org.apache.sling.distribution.journal.impl.publisher;
import static java.util.Objects.requireNonNull;
import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
-import static org.apache.sling.distribution.DistributionRequestType.ADD;
-import static org.apache.sling.distribution.DistributionRequestType.DELETE;
-import static org.apache.sling.distribution.DistributionRequestType.TEST;
+import static org.apache.sling.distribution.DistributionRequestType.*;
import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
import static org.apache.sling.distribution.journal.shared.Strings.requireNotBlank;
@@ -145,9 +143,10 @@ public class DistributionPublisher implements DistributionAgent {
public DistributionPublisher() {
log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
- reqTypes.put(ADD, this::sendAndWait);
- reqTypes.put(DELETE, this::sendAndWait);
- reqTypes.put(TEST, this::send);
+ reqTypes.put(ADD, this::sendAndWait);
+ reqTypes.put(DELETE, this::sendAndWait);
+ reqTypes.put(INVALIDATE, this::sendAndWait);
+ reqTypes.put(TEST, this::send);
}
@Activate
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index 051e037..ed95991 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -91,6 +91,7 @@ public class PackageMessageFactory {
switch (request.getRequestType()) {
case ADD: return createAdd(packageBuilder, resourceResolver, pubAgentName, request);
case DELETE: return createDelete(packageBuilder, resourceResolver, request, pubAgentName);
+ case INVALIDATE: return createInvalidate(packageBuilder, resourceResolver, request, pubAgentName);
case TEST: return createTest(packageBuilder, resourceResolver, pubAgentName);
default: throw new IllegalArgumentException(format("Unsupported request with requestType=%s", request.getRequestType()));
}
@@ -133,7 +134,21 @@ public class PackageMessageFactory {
disPkg.delete();
return pipePackage;
}
-
+
+ @Nonnull
+ private PackageMessage createInvalidate(DistributionPackageBuilder packageBuilder, ResourceResolver resourceResolver, DistributionRequest request, String pubAgentName) {
+ String pkgId = UUID.randomUUID().toString();
+ return PackageMessage.builder()
+ .pubSlingId(pubSlingId)
+ .pkgId(pkgId)
+ .pubAgentName(pubAgentName)
+ .paths(Arrays.asList(request.getPaths()))
+ .reqType(ReqType.INVALIDATE)
+ .pkgType(packageBuilder.getType())
+ .userId(resourceResolver.getUserID())
+ .build();
+ }
+
@Nonnull
private PackageMessage createDelete(DistributionPackageBuilder packageBuilder, ResourceResolver resourceResolver, DistributionRequest request, String pubAgentName) {
String pkgId = UUID.randomUUID().toString();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9460068..5c4acfd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -24,6 +24,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.messages.PackageMessage.ReqType.INVALIDATE;
import static org.apache.sling.distribution.journal.shared.Delay.exponential;
import static org.apache.sling.distribution.journal.shared.Strings.requireNotBlank;
@@ -51,6 +52,7 @@ import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.ImportPostProcessException;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.FullMessage;
@@ -320,7 +322,7 @@ public class DistributionSubscriber {
LOG.info("Stopped Queue processor");
}
- private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException {
+ private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException, ImportPostProcessException {
blockingSendStoredStatus();
FullMessage<PackageMessage> item = blockingPeekQueueItem();
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -359,14 +361,17 @@ public class DistributionSubscriber {
throw new InterruptedException("Shutting down");
}
- private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException {
+ private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
MessageInfo info = item.getInfo();
PackageMessage pkgMsg = item.getMessage();
boolean skip = shouldSkip(info.getOffset());
+ PackageMessage.ReqType type = pkgMsg.getReqType();
try {
idleCheck.busy(bookKeeper.getRetries(pkgMsg.getPubAgentName()));
if (skip) {
bookKeeper.removePackage(pkgMsg, info.getOffset());
+ } else if (type == INVALIDATE) {
+ bookKeeper.invalidateCache(pkgMsg, info.getOffset());
} else {
bookKeeper.importPackage(pkgMsg, info.getOffset(), info.getCreateTime());
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
index a026efb..3cf32ef 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
@@ -93,6 +93,8 @@ public final class QueueItemFactory {
return DistributionRequestType.ADD;
case DELETE:
return DistributionRequestType.DELETE;
+ case INVALIDATE:
+ return DistributionRequestType.INVALIDATE;
case TEST:
return DistributionRequestType.TEST;
default:
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index bda5e89..de68952 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -97,6 +97,12 @@ public class DistributionMetricsService {
private Counter importPostProcessRequest;
+ private Timer invalidationProcessDuration;
+
+ private Counter invalidationProcessSuccess;
+
+ private Counter invalidationProcessRequest;
+
private BundleContext context;
@Activate
@@ -124,6 +130,9 @@ public class DistributionMetricsService {
importPostProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "import_post_process_duration"));
importPostProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_success_count"));
importPostProcessRequest = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_request_count"));
+ invalidationProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "invalidation_process_duration"));
+ invalidationProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_success_count"));
+ invalidationProcessRequest = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_request_count"));
}
/**
@@ -371,6 +380,18 @@ public class DistributionMetricsService {
return importPostProcessRequest;
}
+ public Timer getInvalidationProcessDuration() {
+ return invalidationProcessDuration;
+ }
+
+ public Counter getInvalidationProcessSuccess() {
+ return invalidationProcessSuccess;
+ }
+
+ public Counter getInvalidationProcessRequest() {
+ return invalidationProcessRequest;
+ }
+
public class GaugeService<T> implements Gauge<T>, Closeable {
@SuppressWarnings("rawtypes")
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/NoOpInvalidationProcessor.java b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpInvalidationProcessor.java
new file mode 100644
index 0000000..6d513cc
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpInvalidationProcessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import org.apache.sling.distribution.InvalidationProcessException;
+import org.apache.sling.distribution.InvalidationProcessor;
+import org.osgi.service.component.annotations.Component;
+
+import java.util.Map;
+
+@Component(
+ property = {
+ "type=default"
+ }
+)
+public class NoOpInvalidationProcessor implements InvalidationProcessor {
+ @Override
+ public void process(Map<String, Object> props) throws InvalidationProcessException {}
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 5bf9d6e..7174b12 100644
--- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -36,6 +36,7 @@ import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.InvalidationProcessor;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -82,6 +83,9 @@ public class BookKeeperTest {
@Mock
private ImportPostProcessor importPostProcessor;
+ @Mock
+ private InvalidationProcessor invalidationProcessor;
+
@Before
public void before() {
when(distributionMetricsService.getFailedPackageImports())
@@ -98,9 +102,15 @@ public class BookKeeperTest {
.thenReturn(mock(Timer.class));
when(distributionMetricsService.getImportPostProcessSuccess())
.thenReturn(mock(Counter.class));
+ when(distributionMetricsService.getInvalidationProcessRequest())
+ .thenReturn(mock(Counter.class));
+ when(distributionMetricsService.getInvalidationProcessDuration())
+ .thenReturn(mock(Timer.class));
+ when(distributionMetricsService.getInvalidationProcessSuccess())
+ .thenReturn(mock(Counter.class));
BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package");
bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, logSender, bkConfig,
- importPostProcessor);
+ importPostProcessor, invalidationProcessor);
}
@Test
@@ -122,20 +132,29 @@ public class BookKeeperTest {
@Test
public void testPackageImport() throws DistributionException {
try {
- bookKeeper.importPackage(buildPackageMessage(), 10, currentTimeMillis());
+ bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, currentTimeMillis());
+ } finally {
+ assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
+ }
+ }
+
+ @Test
+ public void testCacheInvalidation() throws DistributionException {
+ try {
+ bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE), 10);
} finally {
assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
}
}
- PackageMessage buildPackageMessage() {
+ PackageMessage buildPackageMessage(PackageMessage.ReqType reqType) {
PackageMessage msg = mock(PackageMessage.class);
when(msg.getPkgLength())
.thenReturn(100L);
when(msg.getPubAgentName())
.thenReturn(PUB_AGENT_NAME);
when(msg.getReqType())
- .thenReturn(PackageMessage.ReqType.ADD);
+ .thenReturn(reqType);
when(msg.getPaths())
.thenReturn(singletonList("/content"));
when(msg.getPkgId())
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
index 90f7f5c..02e568e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPackageFactoryTest.java
@@ -149,6 +149,16 @@ public class DistributionPackageFactoryTest {
assertThat(sent.getPkgBinary(), nullValue());
assertThat(sent.getPkgLength(), equalTo(0L));
assertThat(sent.getPaths(), contains("/test"));
-
+ }
+
+ @Test
+ public void testInvalidate() throws DistributionException, IOException {
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.INVALIDATE, "/test");
+
+ PackageMessage sent = publisher.create(packageBuilder, resourceResolver, "pub1agent1", request);
+ assertThat(sent.getReqType(), equalTo(ReqType.INVALIDATE));
+ assertThat(sent.getPkgBinary(), nullValue());
+ assertThat(sent.getPkgLength(), equalTo(0L));
+ assertThat(sent.getPaths(), contains("/test"));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 4537f00..a7daea9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -185,6 +185,12 @@ public class DistributionPublisherTest {
}
@Test
+ public void executeRequestINVALIDATEAccepted() throws DistributionException, IOException {
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.INVALIDATE, "/test");
+ executeAndCheck(request);
+ }
+
+ @Test
public void executeRequestTESTAccepted() throws DistributionException, IOException {
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.TEST, "/test");
executeAndCheck(request);
diff --git a/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java b/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java
index 5826a1b..adae72f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java
+++ b/src/test/java/org/apache/sling/distribution/journal/msg/InMemoryProvider.java
@@ -65,6 +65,11 @@ public class InMemoryProvider implements MessagingProvider {
}
@Override
+ public String assignTo(Reset reset, long relativeOffset) {
+ return String.format("%s", reset.name());
+ }
+
+ @Override
public URI getServerUri() {
try {
return new URI("http://localhost:12345/dummy");