You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/18 09:41:31 UTC
[sling-org-apache-sling-distribution-journal] 01/02: GRANITE-23546
- Extract PackageHandler
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch GRANITE-23546
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 4d13e626d9cc4ff9cafb5925dde59904bcc764a2
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 16 17:47:07 2019 +0100
GRANITE-23546 - Extract PackageHandler
---
.../impl/subscriber/DistributionSubscriber.java | 109 ++++++---------------
.../journal/impl/subscriber/PackageHandler.java | 89 +++++++++++++++++
2 files changed, 121 insertions(+), 77 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..3607b29 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -18,14 +18,6 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
@@ -33,9 +25,16 @@ import static java.util.Collections.singletonMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
import java.io.Closeable;
-import java.io.InputStream;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
@@ -52,25 +51,16 @@ import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
@@ -78,6 +68,25 @@ import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
+import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -97,19 +106,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.Reset;
-
/**
* A Subscriber SCD agent which consumes messages produced by a {@code DistributionPublisher} agent.
*/
@@ -205,7 +201,7 @@ public class DistributionSubscriber implements DistributionAgent {
private volatile Thread queueProcessor;
- private ContentPackageExtractor extractor;
+ private PackageHandler packageHandler;
@Activate
public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
@@ -313,7 +309,8 @@ public class DistributionSubscriber implements DistributionAgent {
LOG.info(msg);
Dictionary<String, Object> props = createServiceProps(config);
componentReg = context.registerService(DistributionAgent.class, this, props);
- extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+ ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+ packageHandler = new PackageHandler(packageBuilder, extractor);
}
private Set<String> getNotEmpty(String[] agentNames) {
@@ -558,7 +555,7 @@ public class DistributionSubscriber implements DistributionAgent {
* the content updates are applied.
*/
- installPackage(importerResolver, pkgMsg);
+ packageHandler.apply(importerResolver, pkgMsg);
if (editable) {
storeStatus(importerResolver, IMPORTED, offset, pubAgentName);
}
@@ -616,48 +613,6 @@ public class DistributionSubscriber implements DistributionAgent {
MDC.put("sub-agent-name", subAgentName);
}
-
- private void installPackage(ResourceResolver resolver, PackageMessage pkgMsg)
- throws DistributionException, PersistenceException {
- PackageMessage.ReqType type = pkgMsg.getReqType();
- switch (type) {
- case ADD:
- installAddPackage(resolver, pkgMsg);
- break;
- case DELETE:
- installDeletePackage(resolver, pkgMsg);
- break;
- case TEST:
- break;
- default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
- }
- }
-
- private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
- throws DistributionException {
- LOG.info("Importing paths " + pkgMsg.getPathsList());
- InputStream pkgStream = null;
- try {
- pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
- packageBuilder.installPackage(resolver, pkgStream);
- extractor.handle(resolver, pkgMsg.getPathsList());
- } finally {
- IOUtils.closeQuietly(pkgStream);
- }
-
- }
-
- private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
- throws PersistenceException {
- LOG.info("Deleting paths " + pkgMsg.getPathsList());
- for (String path : pkgMsg.getPathsList()) {
- Resource resource = resolver.getResource(path);
- if (resource != null) {
- resolver.delete(resource);
- }
- }
- }
-
private void storeStatus(ResourceResolver resolver, Status status, long offset, String pubAgentName) throws PersistenceException {
Map<String, Object> s = new HashMap<>();
s.put("pubAgentName", pubAgentName);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
new file mode 100644
index 0000000..7307fdc
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PackageHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
+
+ private DistributionPackageBuilder packageBuilder;
+
+ private ContentPackageExtractor extractor;
+
+ public PackageHandler(DistributionPackageBuilder packageBuilder, ContentPackageExtractor extractor) {
+ this.packageBuilder = packageBuilder;
+ this.extractor = extractor;
+ }
+
+ public void apply(ResourceResolver resolver, PackageMessage pkgMsg)
+ throws DistributionException, PersistenceException {
+ PackageMessage.ReqType type = pkgMsg.getReqType();
+ switch (type) {
+ case ADD:
+ installAddPackage(resolver, pkgMsg);
+ break;
+ case DELETE:
+ installDeletePackage(resolver, pkgMsg);
+ break;
+ case TEST:
+ break;
+ default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
+ }
+ }
+
+ private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
+ throws DistributionException {
+ LOG.info("Importing paths " + pkgMsg.getPathsList());
+ InputStream pkgStream = null;
+ try {
+ pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+ packageBuilder.installPackage(resolver, pkgStream);
+ extractor.handle(resolver, pkgMsg.getPathsList());
+ } finally {
+ IOUtils.closeQuietly(pkgStream);
+ }
+
+ }
+
+ private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
+ throws PersistenceException {
+ LOG.info("Deleting paths " + pkgMsg.getPathsList());
+ for (String path : pkgMsg.getPathsList()) {
+ Resource resource = resolver.getResource(path);
+ if (resource != null) {
+ resolver.delete(resource);
+ }
+ }
+ }
+
+}