You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/18 09:41:31 UTC

[sling-org-apache-sling-distribution-journal] 01/02: GRANITE-23546 - Extract PackageHandler

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch GRANITE-23546
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 4d13e626d9cc4ff9cafb5925dde59904bcc764a2
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 16 17:47:07 2019 +0100

    GRANITE-23546 - Extract PackageHandler
---
 .../impl/subscriber/DistributionSubscriber.java    | 109 ++++++---------------
 .../journal/impl/subscriber/PackageHandler.java    |  89 +++++++++++++++++
 2 files changed, 121 insertions(+), 77 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..3607b29 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -18,14 +18,6 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
-import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 import static java.lang.String.format;
 import static java.lang.System.currentTimeMillis;
 import static java.util.Arrays.asList;
@@ -33,9 +25,16 @@ import static java.util.Collections.singletonMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
 
 import java.io.Closeable;
-import java.io.InputStream;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.HashMap;
@@ -52,25 +51,16 @@ import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
 import org.apache.sling.distribution.DistributionRequestType;
@@ -78,6 +68,25 @@ import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
+import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.log.spi.DistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -97,19 +106,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.Reset;
-
 /**
  * A Subscriber SCD agent which consumes messages produced by a {@code DistributionPublisher} agent.
  */
@@ -205,7 +201,7 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private volatile Thread queueProcessor;
     
-    private ContentPackageExtractor extractor;
+    private PackageHandler packageHandler;
     
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
@@ -313,7 +309,8 @@ public class DistributionSubscriber implements DistributionAgent {
         LOG.info(msg);
         Dictionary<String, Object> props = createServiceProps(config);
         componentReg = context.registerService(DistributionAgent.class, this, props);
-        extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+        packageHandler = new PackageHandler(packageBuilder, extractor);
     }
 
     private Set<String> getNotEmpty(String[] agentNames) {
@@ -558,7 +555,7 @@ public class DistributionSubscriber implements DistributionAgent {
              * the content updates are applied.
              */
 
-            installPackage(importerResolver, pkgMsg);
+            packageHandler.apply(importerResolver, pkgMsg);
             if (editable) {
                 storeStatus(importerResolver, IMPORTED, offset, pubAgentName);
             }
@@ -616,48 +613,6 @@ public class DistributionSubscriber implements DistributionAgent {
         MDC.put("sub-agent-name", subAgentName);
     }
 
-
-    private void installPackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws DistributionException, PersistenceException {
-        PackageMessage.ReqType type = pkgMsg.getReqType();
-        switch (type) {
-            case ADD:
-                installAddPackage(resolver, pkgMsg);
-                break;
-            case DELETE:
-                installDeletePackage(resolver, pkgMsg);
-                break;
-            case TEST:
-                break;
-            default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
-        }
-    }
-
-    private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws DistributionException {
-        LOG.info("Importing paths " + pkgMsg.getPathsList());
-        InputStream pkgStream = null;
-        try {
-            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
-            packageBuilder.installPackage(resolver, pkgStream);
-            extractor.handle(resolver, pkgMsg.getPathsList());
-        } finally {
-            IOUtils.closeQuietly(pkgStream);
-        }
-
-    }
-
-    private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws PersistenceException {
-        LOG.info("Deleting paths " + pkgMsg.getPathsList());
-        for (String path : pkgMsg.getPathsList()) {
-            Resource resource = resolver.getResource(path);
-            if (resource != null) {
-                resolver.delete(resource);
-            }
-        }
-    }
-
     private void storeStatus(ResourceResolver resolver, Status status, long offset, String pubAgentName) throws PersistenceException {
         Map<String, Object> s = new HashMap<>();
         s.put("pubAgentName", pubAgentName);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
new file mode 100644
index 0000000..7307fdc
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static java.lang.String.format;
+
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PackageHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
+    
+    private DistributionPackageBuilder packageBuilder;
+    
+    private ContentPackageExtractor extractor;
+
+    public PackageHandler(DistributionPackageBuilder packageBuilder, ContentPackageExtractor extractor) {
+        this.packageBuilder = packageBuilder;
+        this.extractor = extractor;
+    }
+
+    public void apply(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws DistributionException, PersistenceException {
+        PackageMessage.ReqType type = pkgMsg.getReqType();
+        switch (type) {
+            case ADD:
+                installAddPackage(resolver, pkgMsg);
+                break;
+            case DELETE:
+                installDeletePackage(resolver, pkgMsg);
+                break;
+            case TEST:
+                break;
+            default: throw new UnsupportedOperationException(format("Unable to process messages with type: %s", type));
+        }
+    }
+
+    private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws DistributionException {
+        LOG.info("Importing paths " + pkgMsg.getPathsList());
+        InputStream pkgStream = null;
+        try {
+            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+            packageBuilder.installPackage(resolver, pkgStream);
+            extractor.handle(resolver, pkgMsg.getPathsList());
+        } finally {
+            IOUtils.closeQuietly(pkgStream);
+        }
+
+    }
+
+    private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws PersistenceException {
+        LOG.info("Deleting paths " + pkgMsg.getPathsList());
+        for (String path : pkgMsg.getPathsList()) {
+            Resource resource = resolver.getResource(path);
+            if (resource != null) {
+                resolver.delete(resource);
+            }
+        }
+    }
+    
+}