You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/23 13:47:41 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-8932 - Redesign DistributionSubscriber to offload
responsibilities (#17)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new e88d623 SLING-8932 - Redesign DistributionSubscriber to offload responsibilities (#17)
e88d623 is described below
commit e88d623e506f3e4a7e29cd982f583571948add48
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Mon Dec 23 14:47:35 2019 +0100
SLING-8932 - Redesign DistributionSubscriber to offload responsibilities (#17)
* SLING-8932 - Redesign DistributionSubscriber to offload responsibilities
* SLING-8932 - Fixing issues from review
* SLING-8932 - Use old logic for sendStoredStatus
---
.../journal/impl/subscriber/Announcer.java | 38 +-
.../journal/impl/subscriber/BookKeeper.java | 335 +++++++++++++
.../journal/impl/subscriber/CommandPoller.java | 104 ++++
.../impl/subscriber/DistributionSubscriber.java | 544 ++++-----------------
.../journal/impl/subscriber/PackageHandler.java | 89 ++++
.../journal/impl/subscriber/AnnouncerTest.java | 16 +-
6 files changed, 645 insertions(+), 481 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index a121693..ffc27bb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -24,29 +24,25 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.MessageSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ParametersAreNonnullByDefault
class Announcer implements Runnable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(Announcer.class);
- private final String topicName;
+ private final BookKeeper bookKeeper;
- private final LocalStore offsetStore;
-
- private final MessageSender<DiscoveryMessage> sender;
+ private final Consumer<DiscoveryMessage> sender;
private final String subSlingId;
@@ -54,8 +50,6 @@ class Announcer implements Runnable, Closeable {
private final Set<String> pubAgentNames;
- private final PackageRetries packageRetries;
-
private final boolean editable;
private final int maxRetries;
@@ -64,21 +58,17 @@ class Announcer implements Runnable, Closeable {
public Announcer(String subSlingId,
String subAgentName,
- String topicName,
Set<String> pubAgentNames,
- MessageSender<DiscoveryMessage> disSender,
- LocalStore offsetStore,
- PackageRetries packageRetries,
+ Consumer<DiscoveryMessage> disSender,
+ BookKeeper bookKeeper,
int maxRetries,
boolean editable,
int announceDelay) {
this.subSlingId = Objects.requireNonNull(subSlingId);
this.subAgentName = Objects.requireNonNull(subAgentName);
- this.topicName = Objects.requireNonNull(topicName);
this.pubAgentNames = Objects.requireNonNull(pubAgentNames);
this.sender = Objects.requireNonNull(disSender);
- this.offsetStore = Objects.requireNonNull(offsetStore);
- this.packageRetries = Objects.requireNonNull(packageRetries);
+ this.bookKeeper = Objects.requireNonNull(bookKeeper);
this.maxRetries = maxRetries;
this.editable = editable;
executor = Executors.newSingleThreadScheduledExecutor();
@@ -90,7 +80,7 @@ class Announcer implements Runnable, Closeable {
LOG.debug("Sending discovery message for agent {}", subAgentName);
try {
- long offset = offsetStore.load("offset", -1L);
+ long offset = bookKeeper.loadOffset();
SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
.setEditable(editable)
@@ -102,18 +92,18 @@ class Announcer implements Runnable, Closeable {
.setSubAgentName(subAgentName)
.setSubscriberConfiguration(subscriberConfiguration);
for (String pubAgentName : pubAgentNames) {
- int retries = packageRetries.get(pubAgentName);
- disMsgBuilder.addSubscriberState(createOffset(pubAgentName, offset, retries));
+ disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
}
- sender.send(topicName, disMsgBuilder.build());
+ sender.accept(disMsgBuilder.build());
} catch (Throwable e) {
String msg = String.format("Failed to send discovery message for agent %s, %s", subAgentName, e.getMessage());
LOG.info(msg, e);
}
}
- private SubscriberState createOffset(String pubAgentName, long offset, int retries) {
+ private SubscriberState subscriberState(String pubAgentName, long offset) {
+ int retries = bookKeeper.getRetries(pubAgentName);
return Messages.SubscriberState.newBuilder()
.setPubAgentName(pubAgentName)
.setRetries(retries)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
new file mode 100644
index 0000000..e8136c2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Keeps track of offset and processed status and manages
+ * coordinates the import/retry handling.
+ *
+ * The offset store is identified by the agentName only.
+ *
+ * With non clustered publish instances deployment, each
+ * instance stores the offset in its own node store, thus
+ * avoiding mix ups. Moreover, when cloning an instance
+ * from a node store, the cloned instance will implicitly
+ * recover the offsets and start from the last processed
+ * offset.
+ *
+ * With clustered publish instances deployment, only one
+ * Subscriber agent must run on the cluster in order to
+ * avoid mix ups.
+ *
+ * The clustered and non clustered publish instances use
+ * cases can be supported by only running the Subscriber
+ * agent on the leader instance.
+ */
+public class BookKeeper implements Closeable {
+ private static final String SUBSERVICE_IMPORTER = "importer";
+ private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
+ private static final int RETRY_SEND_DELAY = 1000;
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private final ResourceResolverFactory resolverFactory;
+ private final DistributionMetricsService distributionMetricsService;
+ private final PackageHandler packageHandler;
+ private final EventAdmin eventAdmin;
+ private final Consumer<PackageStatusMessage> sender;
+ private final boolean editable;
+ private final int maxRetries;
+ private final boolean errorQueueEnabled;
+
+ private final PackageRetries packageRetries = new PackageRetries();
+ private final LocalStore statusStore;
+ private final LocalStore processedOffsets;
+ private final String subAgentName;
+ private final String subSlingId;
+ private GaugeService<Integer> retriesGauge;
+
+ public BookKeeper(ResourceResolverFactory resolverFactory,
+ DistributionMetricsService distributionMetricsService,
+ PackageHandler packageHandler,
+ EventAdmin eventAdmin,
+ Consumer<PackageStatusMessage> sender,
+ String subAgentName,
+ String subSlingId,
+ boolean editable,
+ int maxRetries) {
+ this.packageHandler = packageHandler;
+ this.eventAdmin = eventAdmin;
+ String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
+ this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+ this.resolverFactory = resolverFactory;
+ this.distributionMetricsService = distributionMetricsService;
+ this.sender = sender;
+ this.subAgentName = subAgentName;
+ this.subSlingId = subSlingId;
+ this.editable = editable;
+ this.maxRetries = maxRetries;
+ // Error queues are enabled when the number
+ // of retry attempts is limited ; disabled otherwise
+ this.errorQueueEnabled = (maxRetries >= 0);
+ this.statusStore = new LocalStore(resolverFactory, "statuses", subAgentName);
+ this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
+ }
+
+ /**
+ * We aim at processing the packages exactly once. Processing the packages
+ * exactly once is possible with the following conditions
+ *
+ * I. The package importer is configured to disable auto-committing changes.
+ *
+ * II. A single commit aggregates three content updates
+ *
+ * C1. install the package
+ * C2. store the processing status
+ * C3. store the offset processed
+ *
+ * Some package importers require auto-saving or issue partial commits before
+ * failing. For those packages importers, we aim at processing packages at least
+ * once, thanks to the order in which the content updates are applied.
+ */
+ public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
+ log.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(),
+ pkgMsg.getReqType(), offset));
+ addPackageMDC(pkgMsg);
+ try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
+ ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
+ packageHandler.apply(importerResolver, pkgMsg);
+ if (editable) {
+ storeStatus(importerResolver, new PackageStatus(IMPORTED, offset, pkgMsg.getPubAgentName()));
+ }
+ storeOffset(importerResolver, offset);
+ importerResolver.commit();
+ distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
+ distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+ packageRetries.clear(pkgMsg.getPubAgentName());
+ Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
+ eventAdmin.postEvent(event);
+ } catch (LoginException | IOException | RuntimeException e) {
+ failure(pkgMsg, offset, e);
+ } finally {
+ MDC.clear();
+ }
+ }
+
+ private void addPackageMDC(PackageMessage pkgMsg) {
+ MDC.put("module", "distribution");
+ MDC.put("package-id", pkgMsg.getPkgId());
+ String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
+ MDC.put("paths", paths);
+ MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
+ String pubAgentName = pkgMsg.getPubAgentName();
+ MDC.put("pub-agent-name", pubAgentName);
+ MDC.put("distribution-message-type", pkgMsg.getReqType().name());
+ MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
+ MDC.put("sub-sling-id", subSlingId);
+ MDC.put("sub-agent-name", subAgentName);
+ }
+
+ /**
+ * Should be called on a exception while importing a package.
+ *
+ * When we use an error queue and the max retries is reached the package is removed.
+ * In all other cases a DistributionException is thrown that signals that we should retry the
+ * package.
+ *
+ * @param pkgMsg
+ * @param offset
+ * @param e
+ * @throws DistributionException if the package should be retried
+ */
+ private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
+ distributionMetricsService.getFailedPackageImports().mark();
+
+ String pubAgentName = pkgMsg.getPubAgentName();
+ int retries = packageRetries.get(pubAgentName);
+ if (errorQueueEnabled && retries >= maxRetries) {
+ log.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
+ removeFailedPackage(pkgMsg, offset);
+ } else {
+ packageRetries.increase(pubAgentName);
+ String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
+ throw new DistributionException(msg, e);
+ }
+ }
+
+ public void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
+ log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+ Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
+ try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+ if (editable) {
+ storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
+ }
+ storeOffset(resolver, offset);
+ resolver.commit();
+ }
+ packageRetries.clear(pkgMsg.getPubAgentName());
+ context.stop();
+ }
+
+ public void sendStoredStatus() throws InterruptedException, IOException {
+ try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+ PackageStatus status = new PackageStatus(statusStore.load());
+ boolean sent = status.sent;
+ for (int retry = 0 ; !sent ; retry++) {
+ try {
+ sendStatusMessage(status, retry);
+ markStatusSent();
+ sent = true;
+ } catch (Exception e) {
+ log.warn("Cannot send status (retry {})", retry, e);
+ Thread.sleep(RETRY_SEND_DELAY);
+ }
+ }
+ }
+ }
+
+ private void sendStatusMessage(PackageStatus status, int retry) throws InterruptedException {
+ PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
+ .setSubSlingId(subSlingId)
+ .setSubAgentName(subAgentName)
+ .setPubAgentName(status.pubAgentName)
+ .setOffset(status.offset)
+ .setStatus(status.status)
+ .build();
+ sender.accept(pkgStatMsg);
+ log.info("Sent status message {}", pkgStatMsg);
+ }
+
+ public void markStatusSent() {
+ try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+ statusStore.store(resolver, "sent", true);
+ resolver.commit();
+ } catch (Exception e) {
+ log.warn("Failed to mark status as sent", e);
+ }
+ }
+
+ public long loadOffset() {
+ return processedOffsets.load("offset", -1L);
+ }
+
+ public int getRetries(String pubAgentName) {
+ return packageRetries.get(pubAgentName);
+ }
+
+ public PackageRetries getPackageRetries() {
+ return packageRetries;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(retriesGauge);
+ }
+
+ private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
+ log.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+ Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
+ try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+ storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
+ storeOffset(resolver, offset);
+ resolver.commit();
+ } catch (Exception e) {
+ throw new DistributionException("Error removing failed package", e);
+ }
+ context.stop();
+ }
+
+ private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) throws PersistenceException {
+ Map<String, Object> statusMap = packageStatus.asMap();
+ statusStore.store(resolver, statusMap);
+ log.info("Stored status {}", statusMap);
+ }
+
+ private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
+ processedOffsets.store(resolver, "offset", offset);
+ }
+
+ private ResourceResolver getServiceResolver(String subService) throws LoginException {
+ return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, SUBSERVICE_BOOKKEEPER));
+ }
+
+ class PackageStatus {
+ final Status status;
+ final Long offset;
+ final String pubAgentName;
+ final Boolean sent;
+
+ PackageStatus(Status status, long offset, String pubAgentName) {
+ this.status = status;
+ this.offset = offset;
+ this.pubAgentName = pubAgentName;
+ this.sent = false;
+ }
+
+ PackageStatus(ValueMap statusMap) {
+ Integer statusNum = statusMap.get("statusNumber", Integer.class);
+ this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
+ this.offset = statusMap.get("offset", Long.class);
+ this.pubAgentName = statusMap.get("pubAgentName", String.class);
+ this.sent = statusMap.get("sent", true);
+ }
+
+ Map<String, Object> asMap() {
+ Map<String, Object> s = new HashMap<>();
+ s.put("pubAgentName", pubAgentName);
+ s.put("statusNumber", status.getNumber());
+ s.put("offset", offset);
+ s.put("sent", sent);
+ return s;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
new file mode 100644
index 0000000..10186a7
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommandPoller implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
+
+ private final String subSlingId;
+ private final String subAgentName;
+ private final boolean editable;
+ private final Closeable commandPoller;
+ private final AtomicLong clearOffset = new AtomicLong(-1);
+
+ public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, boolean editable) {
+ this.subSlingId = subSlingId;
+ this.subAgentName = subAgentName;
+ this.editable = editable;
+ if (editable) {
+
+ /*
+ * We currently only support commands requiring editable mode.
+ * As an optimisation, we don't register a poller for non
+ * editable subscribers.
+ *
+ * When supporting commands independent from editable mode,
+ * this optimisation will be removed.
+ */
+
+ commandPoller = messagingProvider.createPoller(
+ topics.getCommandTopic(),
+ Reset.earliest,
+ create(CommandMessage.class, this::handleCommandMessage));
+ } else {
+ commandPoller = null;
+ }
+ }
+
+ public boolean isCleared(long offset) {
+ return offset <= clearOffset.longValue();
+ }
+
+ private void handleCommandMessage(MessageInfo info, CommandMessage message) {
+ if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) {
+ LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
+ return;
+ }
+
+ if (message.hasClearCommand()) {
+ handleClearCommand(message.getClearCommand().getOffset());
+ } else {
+ LOG.warn("Unsupported command {}", message);
+ }
+ }
+
+ private void handleClearCommand(long offset) {
+ if (editable) {
+ updateClearOffsetIfLarger(offset);
+ LOG.info("Handled clear command for offset {}", offset);
+ } else {
+ LOG.warn("Unexpected ClearCommand for non editable subscriber");
+ }
+
+ }
+
+ private long updateClearOffsetIfLarger(long offset) {
+ return clearOffset.accumulateAndGet(offset, Math::max);
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(commandPoller);
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..8ac9e56 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -18,27 +18,19 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
-import static java.util.Collections.singletonMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
-import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
import java.io.Closeable;
-import java.io.InputStream;
import java.util.Collections;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -46,43 +38,40 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
-import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.serviceusermapping.ServiceUserMapped;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
@@ -90,44 +79,28 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.Reset;
+import com.google.protobuf.GeneratedMessage;
/**
- * A Subscriber SCD agent which consumes messages produced by a {@code DistributionPublisher} agent.
+ * A Subscriber SCD agent which consumes messages produced by a
+ * {@code DistributionPublisher} agent.
*/
-@Component(
- service = {},
- immediate = true,
- property = {"announceDelay=10000"},
- configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+@Component(service = {}, immediate = true, property = {
+ "announceDelay=10000" }, configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
@Designate(ocd = SubscriberConfiguration.class, factory = true)
@ParametersAreNonnullByDefault
public class DistributionSubscriber implements DistributionAgent {
private static final int PRECONDITION_TIMEOUT = 60;
- private static final int RETRY_SEND_DELAY = 1000;
static int RETRY_DELAY = 5000;
static int QUEUE_FETCH_DELAY = 1000;
private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
-
+
private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
@Reference(name = "packageBuilder")
@@ -147,7 +120,7 @@ public class DistributionSubscriber implements DistributionAgent {
@Reference
private EventAdmin eventAdmin;
-
+
@Reference
private JournalAvailable journalAvailable;
@@ -156,61 +129,37 @@ public class DistributionSubscriber implements DistributionAgent {
@Reference
private DistributionMetricsService distributionMetricsService;
-
- @Reference
- private ServiceUserMapped mappedUser;
@Reference
private Packaging packaging;
-
- private ServiceRegistration<DistributionAgent> componentReg;
-
- private final PackageRetries packageRetries = new PackageRetries();
- private GaugeService<Integer> retriesGauge;
+ private ServiceRegistration<DistributionAgent> componentReg;
private Closeable packagePoller;
- private Closeable commandPoller;
-
- private LocalStore processedOffsets;
-
- private LocalStore processedStatuses;
+ private CommandPoller commandPoller;
+ private BookKeeper bookKeeper;
- private final AtomicLong clearOffset = new AtomicLong(-1);
-
- // Use a bounded internal buffer to allow reading further packages while working on one at a time
+ // Use a bounded internal buffer to allow reading further packages while working
+ // on one at a time
private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8);
private Set<String> queueNames = Collections.emptySet();
- private MessageSender<PackageStatusMessage> sender;
-
private Announcer announcer;
private String subAgentName;
- private String subSlingId;
-
private String pkgType;
- private int maxRetries;
-
- private boolean errorQueueEnabled;
-
- private boolean editable;
-
private volatile boolean running = true;
private volatile Thread queueProcessor;
-
- private ContentPackageExtractor extractor;
-
+
@Activate
public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
-
- subSlingId = requireNonNull(slingSettings.getSlingId());
+ String subSlingId = requireNonNull(slingSettings.getSlingId());
subAgentName = requireNonNull(config.name());
requireNonNull(config);
requireNonNull(context);
@@ -223,103 +172,46 @@ public class DistributionSubscriber implements DistributionAgent {
requireNonNull(precondition);
queueNames = getNotEmpty(config.agentNames());
+ int maxRetries = config.maxRetries();
+ boolean editable = config.editable();
- maxRetries = config.maxRetries();
- // Error queues are enabled when the number
- // of retry attempts is limited ; disabled otherwise
- errorQueueEnabled = (maxRetries >= 0);
-
- editable = config.editable();
-
- // The offset store is identified by the agentName only.
- //
- // With non clustered publish instances deployment, each
- // instance stores the offset in its own node store, thus
- // avoiding mix ups. Moreover, when cloning an instance
- // from a node store, the cloned instance will implicitly
- // recover the offsets and start from the last processed
- // offset.
- //
- // With clustered publish instances deployment, only one
- // Subscriber agent must run on the cluster in order to
- // avoid mix ups.
- //
- // The clustered and non clustered publish instances use
- // cases can be supported by only running the Subscriber
- // agent on the leader instance.
- processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
- long startOffset = processedOffsets.load("offset", -1L) + 1;
-
- processedStatuses = new LocalStore(resolverFactory, "statuses", subAgentName);
-
+ ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+ PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+ bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
+ sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
+
+ long startOffset = bookKeeper.loadOffset() + 1;
String assign = messagingProvider.assignTo(startOffset);
- packagePoller = messagingProvider.createPoller(
- topics.getPackageTopic(),
- Reset.earliest,
- assign,
+ packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
create(PackageMessage.class, this::handlePackageMessage));
- if (editable) {
-
- /*
- * We currently only support commands requiring editable mode.
- * As an optimisation, we don't register a poller for non
- * editable subscribers.
- *
- * When supporting commands independent from editable mode,
- * this optimisation will be removed.
- */
-
- commandPoller = messagingProvider.createPoller(
- topics.getCommandTopic(),
- Reset.earliest,
- create(CommandMessage.class, this::handleCommandMessage));
- }
-
+ commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, editable);
queueProcessor = startBackgroundThread(this::processQueue,
format("Queue Processor for Subscriber agent %s", subAgentName));
- sender = messagingProvider.createSender();
-
- String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.name();
- retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
-
int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
- MessageSender<DiscoveryMessage> disSender = messagingProvider.createSender();
- announcer = new Announcer(subSlingId,
- subAgentName,
- topics.getDiscoveryTopic(),
- queueNames,
- disSender,
- processedOffsets,
- packageRetries,
- maxRetries,
- config.editable(),
- announceDelay
- );
+ announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper,
+ maxRetries, config.editable(), announceDelay);
pkgType = requireNonNull(packageBuilder.getType());
-
- String msg = format("Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
- subAgentName,
- startOffset,
- queueNames,
- pkgType,
- config.editable(),
- maxRetries,
- errorQueueEnabled);
+ boolean errorQueueEnabled = (maxRetries >= 0);
+ String msg = format(
+ "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
+ subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled);
LOG.info(msg);
Dictionary<String, Object> props = createServiceProps(config);
componentReg = context.registerService(DistributionAgent.class, this, props);
- extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+ }
+
+ private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
+ MessageSender<T> sender = messagingProvider.createSender();
+ return msg -> sender.send(topic, msg);
}
private Set<String> getNotEmpty(String[] agentNames) {
- return asList(agentNames).stream()
- .filter(StringUtils::isNotBlank)
- .collect(toSet());
+ return asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(toSet());
}
private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
@@ -338,8 +230,8 @@ public class DistributionSubscriber implements DistributionAgent {
@Deactivate
public void deactivate() {
- IOUtils.closeQuietly(retriesGauge);
IOUtils.closeQuietly(announcer);
+ IOUtils.closeQuietly(bookKeeper);
componentReg.unregister();
IOUtils.closeQuietly(packagePoller);
IOUtils.closeQuietly(commandPoller);
@@ -348,10 +240,9 @@ public class DistributionSubscriber implements DistributionAgent {
if (interrupter != null) {
interrupter.interrupt();
}
- String msg = String.format("Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
- subAgentName,
- queueNames,
- pkgType);
+ String msg = String.format(
+ "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
+ subAgentName, queueNames, pkgType);
LOG.info(msg);
}
@@ -363,9 +254,9 @@ public class DistributionSubscriber implements DistributionAgent {
@Override
public DistributionQueue getQueue(@Nonnull String queueName) {
- DistributionQueueItem head = queueItemsBuffer.stream()
- .filter(item -> isIn(queueName, item)).findFirst().orElse(null);
- return new SubQueue(queueName, head, packageRetries);
+ DistributionQueueItem head = queueItemsBuffer.stream().filter(item -> isIn(queueName, item)).findFirst()
+ .orElse(null);
+ return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
}
private boolean isIn(String queueName, DistributionQueueItem queueItem) {
@@ -378,7 +269,7 @@ public class DistributionSubscriber implements DistributionAgent {
public DistributionLog getLog() {
return this::emptyDistributionLog;
}
-
+
private List<String> emptyDistributionLog() {
return Collections.emptyList();
}
@@ -391,8 +282,7 @@ public class DistributionSubscriber implements DistributionAgent {
@Nonnull
@Override
- public DistributionResponse execute(ResourceResolver resourceResolver,
- DistributionRequest request) {
+ public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
return executeUnsupported(request);
}
@@ -405,43 +295,43 @@ public class DistributionSubscriber implements DistributionAgent {
}
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
- if (! queueNames.contains(message.getPubAgentName())) {
+ if (!queueNames.contains(message.getPubAgentName())) {
LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
return;
}
- if (! pkgType.equals(message.getPkgType())) {
+ if (!pkgType.equals(message.getPkgType())) {
LOG.warn(String.format("Skipping package with type %s", message.getPkgType()));
return;
}
DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
- try {
- enqueue(queueItem);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException();
- }
+ enqueue(queueItem);
}
/**
- * We block here if the buffer is full in order to limit the number of
- * binary packages fetched in memory. Note that each queued item contains
- * the binary package to be imported.
+ * We block here if the buffer is full in order to limit the number of binary
+ * packages fetched in memory. Note that each queued item contains the binary
+ * package to be imported.
*/
- private void enqueue(DistributionQueueItem queueItem) throws InterruptedException {
- while (running) {
- if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
- distributionMetricsService.getItemsBufferSize().increment();
- return;
+ private void enqueue(DistributionQueueItem queueItem) {
+ try {
+ while (running) {
+ if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
+ distributionMetricsService.getItemsBufferSize().increment();
+ return;
+ }
}
+ throw new InterruptedException();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException();
}
- throw new InterruptedException();
- }
+ }
private void processQueue() {
LOG.info("Started Queue processor");
- while (! Thread.interrupted()) {
+ while (!Thread.interrupted()) {
try {
- processQueueItems();
+ fetchAndProcessQueueItem();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -449,18 +339,22 @@ public class DistributionSubscriber implements DistributionAgent {
LOG.info("Stopped Queue processor");
}
- private void processQueueItems() throws InterruptedException {
+ private void fetchAndProcessQueueItem() throws InterruptedException {
try {
// send status stored in a previous run if exists
- try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
- sendStoredStatus();
- }
+ bookKeeper.sendStoredStatus();
// block until an item is available
DistributionQueueItem item = blockingPeekQueueItem();
// and then process it
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
processQueueItem(item);
}
+ } catch (IllegalStateException e) {
+ /**
+ * Precondition timed out. We only log this on info level as it is no error
+ */
+ LOG.info(e.getMessage());
+ Thread.sleep(RETRY_DELAY);
} catch (InterruptedException e) {
throw e;
} catch (Throwable t) {
@@ -483,264 +377,20 @@ public class DistributionSubscriber implements DistributionAgent {
private void processQueueItem(DistributionQueueItem queueItem) throws Exception {
long offset = queueItem.get(RECORD_OFFSET, Long.class);
- boolean skip;
- try {
- skip = isCleared(offset) || cannotProcess(offset);
- } catch (IllegalStateException e) {
- /**
- * This will occur when the precondition times out.
- */
- LOG.info(e.getMessage());
- Thread.sleep(RETRY_DELAY);
- return;
- }
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
- String pubAgentName = pkgMsg.getPubAgentName();
+ boolean skip = shouldSkip(offset);
if (skip) {
- removePackage(pkgMsg, offset);
+ bookKeeper.removePackage(pkgMsg, offset);
} else {
long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
- importPackage(pkgMsg, offset, createdTime);
- }
- queueItemProcessed(pubAgentName);
- }
-
- private void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
- LOG.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
- Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
- try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
- if (editable) {
- storeStatus(resolver, REMOVED, offset, pkgMsg.getPubAgentName());
- }
- storeOffset(resolver, offset);
- resolver.commit();
- context.stop();
- }
- }
-
- private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws Exception {
- LOG.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
- Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
- try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
- storeStatus(resolver, REMOVED_FAILED, offset, pkgMsg.getPubAgentName());
- storeOffset(resolver, offset);
- resolver.commit();
- context.stop();
+ bookKeeper.importPackage(pkgMsg, offset, createdTime);
}
- }
-
- private void importPackage(PackageMessage pkgMsg, long offset, long createdTime)
- throws Exception {
- String pubAgentName = pkgMsg.getPubAgentName();
- LOG.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
- addPackageMDC(pkgMsg);
- Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
- try (ResourceResolver importerResolver = getServiceResolver("importer")) {
-
- /*
- * We aim at processing the packages exactly once.
- * Processing the packages exactly once is possible
- * with the following conditions
- *
- * I. The package importer is configured to disable
- * auto-committing changes.
- *
- * II. A single commit aggregates three content updates
- *
- * C1. install the package
- * C2. store the processing status
- * C3. store the offset processed
- *
- * Some package importers require auto-saving or issue
- * partial commits before failing.
- * For those packages importers, we aim at processing
- * packages at least once, thanks to the order in which
- * the content updates are applied.
- */
-
- installPackage(importerResolver, pkgMsg);
- if (editable) {
- storeStatus(importerResolver, IMPORTED, offset, pubAgentName);
- }
- storeOffset(importerResolver, offset);
- importerResolver.commit();
-
- context.stop();
- distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
- distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
-
- Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
- eventAdmin.postEvent(event);
- } catch (Throwable e) {
- distributionMetricsService.getFailedPackageImports().mark();
- // rethrow fatal exceptions
- if (e instanceof Error) {
- throw (Error) e;
- }
- int retries = packageRetries.get(pubAgentName);
- if (errorQueueEnabled && retries >= maxRetries) {
- LOG.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
- removeFailedPackage(pkgMsg, offset);
- } else {
- packageRetries.increase(pubAgentName);
- String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
- throw new DistributionException(msg, e);
- }
- } finally {
- MDC.clear();
- }
- }
-
- private void storeOffset(ResourceResolver importerResolver, long offset)
- throws PersistenceException {
- processedOffsets.store(importerResolver, "offset", offset);
- }
-
- private void queueItemProcessed(String pubAgentName) {
- packageRetries.clear(pubAgentName);
queueItemsBuffer.remove();
distributionMetricsService.getItemsBufferSize().decrement();
}
- private void addPackageMDC(PackageMessage pkgMsg) {
- MDC.put("module", "distribution");
- MDC.put("package-id", pkgMsg.getPkgId());
- String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
- MDC.put("paths", paths);
- MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
- String pubAgentName = pkgMsg.getPubAgentName();
- MDC.put("pub-agent-name", pubAgentName);
- MDC.put("distribution-message-type", pkgMsg.getReqType().name());
- MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
- MDC.put("sub-sling-id", subSlingId);
- MDC.put("sub-agent-name", subAgentName);
- }
-
-
- private void installPackage(ResourceResolver resolver, PackageMessage pkgMsg)
- throws DistributionException, PersistenceException {
- PackageMessage.ReqType type = pkgMsg.getReqType();
- switch (type) {
- case ADD:
- installAddPackage(resolver, pkgMsg);
- break;
- case DELETE:
- installDeletePackage(resolver, pkgMsg);
- break;
- case TEST:
- break;
- default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
- }
- }
-
- private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
- throws DistributionException {
- LOG.info("Importing paths " + pkgMsg.getPathsList());
- InputStream pkgStream = null;
- try {
- pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
- packageBuilder.installPackage(resolver, pkgStream);
- extractor.handle(resolver, pkgMsg.getPathsList());
- } finally {
- IOUtils.closeQuietly(pkgStream);
- }
-
- }
-
- private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
- throws PersistenceException {
- LOG.info("Deleting paths " + pkgMsg.getPathsList());
- for (String path : pkgMsg.getPathsList()) {
- Resource resource = resolver.getResource(path);
- if (resource != null) {
- resolver.delete(resource);
- }
- }
- }
-
- private void storeStatus(ResourceResolver resolver, Status status, long offset, String pubAgentName) throws PersistenceException {
- Map<String, Object> s = new HashMap<>();
- s.put("pubAgentName", pubAgentName);
- s.put("statusNumber", status.getNumber());
- s.put("offset", offset);
- s.put("sent", false);
- processedStatuses.store(resolver, s);
- LOG.info("Stored status {}", s);
- }
-
- private void sendStoredStatus() throws InterruptedException {
- ValueMap status = processedStatuses.load();
- boolean sent = status.get("sent", true);
- for (int retry = 0 ; !sent ; retry++) {
- try {
- sendStatusMessage(status);
- markStatusSent();
- sent = true;
- } catch (Exception e) {
- LOG.warn("Cannot send status (retry {})", retry, e);
- Thread.sleep(RETRY_SEND_DELAY);
- }
- }
- }
-
- private void markStatusSent() {
- try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
- processedStatuses.store(resolver, "sent", true);
- resolver.commit();
- } catch (Exception e) {
- LOG.warn("Failed to mark status as sent", e);
- }
- }
-
- private void sendStatusMessage(ValueMap status) {
-
- PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
- .setSubSlingId(subSlingId)
- .setSubAgentName(subAgentName)
- .setPubAgentName(status.get("pubAgentName", String.class))
- .setOffset(status.get("offset", Long.class))
- .setStatus(Status.valueOf(status.get("statusNumber", Integer.class)))
- .build();
-
- sender.send(topics.getStatusTopic(), pkgStatMsg);
- LOG.info("Sent status message {}", status);
- }
-
- private void handleCommandMessage(MessageInfo info, CommandMessage message) {
- if (subSlingId.equals(message.getSubSlingId()) && subAgentName.equals(message.getSubAgentName())) {
- if (message.hasClearCommand()) {
- handleClearCommand(message.getClearCommand().getOffset());
- } else {
- LOG.warn("Unsupported command {}", message);
- }
- } else {
- LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
- }
- }
-
- private boolean isCleared(long offset) {
- return offset <= clearOffset.longValue();
- }
-
- private boolean cannotProcess(long offset) {
- return !precondition.canProcess(offset , PRECONDITION_TIMEOUT);
- }
-
- private void handleClearCommand(long offset) {
- if (editable) {
- // atomically compare and set clearOffset
- // as the max between the provided offset
- // and the current clearOffset
- clearOffset.accumulateAndGet(offset, Math::max);
- LOG.info("Handled clear command for offset {}", offset);
- } else {
- LOG.warn("Unexpected ClearCommand for non editable subscriber");
- }
-
- }
-
- private ResourceResolver getServiceResolver(String subService) throws LoginException {
- return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService));
+ private boolean shouldSkip(long offset) throws IllegalStateException {
+ return commandPoller.isCleared(offset) || !precondition.canProcess(offset, PRECONDITION_TIMEOUT);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
new file mode 100644
index 0000000..7307fdc
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PackageHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
+
+ private DistributionPackageBuilder packageBuilder;
+
+ private ContentPackageExtractor extractor;
+
+ public PackageHandler(DistributionPackageBuilder packageBuilder, ContentPackageExtractor extractor) {
+ this.packageBuilder = packageBuilder;
+ this.extractor = extractor;
+ }
+
+ public void apply(ResourceResolver resolver, PackageMessage pkgMsg)
+ throws DistributionException, PersistenceException {
+ PackageMessage.ReqType type = pkgMsg.getReqType();
+ switch (type) {
+ case ADD:
+ installAddPackage(resolver, pkgMsg);
+ break;
+ case DELETE:
+ installDeletePackage(resolver, pkgMsg);
+ break;
+ case TEST:
+ break;
+ default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
+ }
+ }
+
+ private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
+ throws DistributionException {
+ LOG.info("Importing paths " + pkgMsg.getPathsList());
+ InputStream pkgStream = null;
+ try {
+ pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+ packageBuilder.installPackage(resolver, pkgStream);
+ extractor.handle(resolver, pkgMsg.getPathsList());
+ } finally {
+ IOUtils.closeQuietly(pkgStream);
+ }
+
+ }
+
+ private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
+ throws PersistenceException {
+ LOG.info("Deleting paths " + pkgMsg.getPathsList());
+ for (String path : pkgMsg.getPathsList()) {
+ Resource resource = resolver.getResource(path);
+ if (resource != null) {
+ resolver.delete(resource);
+ }
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index e9ef03b..05f8a88 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -25,16 +25,13 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
+import java.util.function.Consumer;
import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.MessageSender;
-
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-
public class AnnouncerTest {
private static final String SUB1_SLING_ID = "sub1sling";
@@ -44,14 +41,13 @@ public class AnnouncerTest {
@Test
@SuppressWarnings("unchecked")
public void testDiscoveryMessage() throws InterruptedException {
- MessageSender<Messages.DiscoveryMessage> sender = Mockito.mock(MessageSender.class);
- LocalStore offsetStore = Mockito.mock(LocalStore.class);
- PackageRetries packageRetries = Mockito.mock(PackageRetries.class);
- when(offsetStore.load("offset", -1L)).thenReturn(1l);
- Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, "discoverytopic", Collections.singleton(PUB1_AGENT_NAME), sender, offsetStore, packageRetries, -1, false, 10000);
+ Consumer<Messages.DiscoveryMessage> sender = Mockito.mock(Consumer.class);
+ BookKeeper bookKeeper = Mockito.mock(BookKeeper.class);
+ when(bookKeeper.loadOffset()).thenReturn(1l);
+ Announcer announcer = new Announcer(SUB1_SLING_ID, SUB1_AGENT_NAME, Collections.singleton(PUB1_AGENT_NAME), sender, bookKeeper, -1, false, 10000);
Thread.sleep(200);
ArgumentCaptor<Messages.DiscoveryMessage> msg = forClass(Messages.DiscoveryMessage.class);
- verify(sender).send(Mockito.eq("discoverytopic"), msg.capture());
+ verify(sender).accept(msg.capture());
Messages.DiscoveryMessage message = msg.getValue();
Messages.SubscriberState offset = message.getSubscriberStateList().iterator().next();
assertThat(message.getSubSlingId(), equalTo(SUB1_SLING_ID));