You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2015/12/22 09:40:03 UTC

svn commit: r1721316 [1/2] - in /sling/trunk/contrib/extensions/distribution: core/src/main/java/org/apache/sling/distribution/agent/impl/ core/src/main/java/org/apache/sling/distribution/packaging/ core/src/main/java/org/apache/sling/distribution/pack...

Author: mpetria
Date: Tue Dec 22 08:40:03 2015
New Revision: 1721316

URL: http://svn.apache.org/viewvc?rev=1721316&view=rev
Log:
SLING-5396: delete remote package after it is enqueued

Added:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageProcessor.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackageBuilder.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/core/DistributionContext.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/core/DistributionPackageProxy.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/DefaultDistributionPackageProxy.java
Removed:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/SharedDistributionPackage.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/AdvancedRemoteDistributionPackageImporter.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/AdvancedHttpDistributionTransport.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/MultipleEndpointDistributionTransport.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/TransportEndpointStrategyType.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/transport/impl/AdvancedHttpDistributionTransportTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/transport/impl/MultipleEndpointDistributionTransportTest.java
Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageExporter.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporterFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporter.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporterFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporter.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporterFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageWrapper.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/core/DistributionTransport.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporterTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporterTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporterTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/queue/impl/SelectiveQueueDispatchingStrategyTest.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransportTest.java
    sling/trunk/contrib/extensions/distribution/it/pom.xml
    sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/ReverseDistributionTest.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Tue Dec 22 08:40:03 2015
@@ -54,7 +54,6 @@ import org.apache.sling.distribution.que
 import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
-import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
 import org.apache.sling.distribution.trigger.DistributionTrigger;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.jcr.api.SlingRepository;
@@ -121,10 +120,6 @@ public class ForwardDistributionAgentFac
     public static final String IMPORTER_ENDPOINTS = "packageImporter.endpoints";
 
 
-    @Property(boolValue = false, label = "Use multiple queues", description = "Whether or not to use an individual queue for each importer endpoint. " +
-            "If the queue names are not specified by importer endpoints definition then they are autogenerated.")
-    public static final String USE_MULTIPLE_QUEUES = "useMultipleQueues";
-
     @Property(cardinality = 100, label = "Passive queues", description = "List of queues that should be disabled." +
             "These queues will gather all the packages until they are removed explicitly.")
     public static final String PASSIVE_QUEUES = "passiveQueues";
@@ -226,34 +221,27 @@ public class ForwardDistributionAgentFac
 
         DistributionPackageImporter packageImporter = null;
         Map<String, String> importerEndpointsMap = SettingsUtils.toUriMap(config.get(IMPORTER_ENDPOINTS));
-        boolean useMultipleQueues = PropertiesUtil.toBoolean(config.get(USE_MULTIPLE_QUEUES), false);
         Set<String> processingQueues = new HashSet<String>();
 
-        if (useMultipleQueues) {
-            Set<String> queuesMap = new TreeSet<String>();
-            queuesMap.addAll(importerEndpointsMap.keySet());
-            queuesMap.addAll(Arrays.asList(passiveQueues));
-            String[] queueNames = queuesMap.toArray(new String[queuesMap.size()]);
-
-            if (selectiveQueues != null) {
-                SelectiveQueueDispatchingStrategy dispatchingStrategy = new SelectiveQueueDispatchingStrategy(selectiveQueues, queueNames);
-                Map<String, String> queueAliases = dispatchingStrategy.getMatchingQueues(null);
-                importerEndpointsMap = SettingsUtils.expandUriMap(importerEndpointsMap, queueAliases);
-                exportQueueStrategy = dispatchingStrategy;
-            } else {
-                exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
-            }
-
-            processingQueues.addAll(importerEndpointsMap.keySet());
-            processingQueues.removeAll(Arrays.asList(passiveQueues));
-
-            packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.One);
+        Set<String> queuesMap = new TreeSet<String>();
+        queuesMap.addAll(importerEndpointsMap.keySet());
+        queuesMap.addAll(Arrays.asList(passiveQueues));
+        String[] queueNames = queuesMap.toArray(new String[queuesMap.size()]);
+
+        if (selectiveQueues != null) {
+            SelectiveQueueDispatchingStrategy dispatchingStrategy = new SelectiveQueueDispatchingStrategy(selectiveQueues, queueNames);
+            Map<String, String> queueAliases = dispatchingStrategy.getMatchingQueues(null);
+            importerEndpointsMap = SettingsUtils.expandUriMap(importerEndpointsMap, queueAliases);
+            exportQueueStrategy = dispatchingStrategy;
         } else {
-            exportQueueStrategy = new SingleQueueDispatchingStrategy();
-            processingQueues.addAll(exportQueueStrategy.getQueueNames());
-            packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.All);
+            exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
         }
 
+        processingQueues.addAll(importerEndpointsMap.keySet());
+        processingQueues.removeAll(Arrays.asList(passiveQueues));
+
+        packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap);
+
         DistributionRequestType[] allowedRequests = new DistributionRequestType[]{DistributionRequestType.ADD, DistributionRequestType.DELETE};
 
         String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java Tue Dec 22 08:40:03 2015
@@ -49,7 +49,6 @@ import org.apache.sling.distribution.que
 import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
-import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
 import org.apache.sling.distribution.trigger.DistributionTrigger;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.jcr.api.SlingRepository;
@@ -188,8 +187,7 @@ public class ReverseDistributionAgentFac
         int pullItems = PropertiesUtil.toInteger(config.get(PULL_ITEMS), Integer.MAX_VALUE);
 
 
-        DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints,
-                TransportEndpointStrategyType.All, pullItems);
+        DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints, pullItems);
         DistributionPackageImporter packageImporter = new LocalDistributionPackageImporter(packageBuilder);
         DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Tue Dec 22 08:40:03 2015
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.SimpleCredentials;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -52,6 +53,7 @@ import org.apache.sling.distribution.com
 import org.apache.sling.distribution.impl.SimpleDistributionResponse;
 import org.apache.sling.distribution.log.DistributionLog;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueWrapper;
 import org.apache.sling.distribution.serialization.DistributionPackage;
@@ -196,11 +198,7 @@ public class SimpleDistributionAgent imp
 
             agentResourceResolver = getAgentResourceResolver(callingUser);
 
-            List<DistributionPackage> distributionPackages = exportPackages(agentResourceResolver, distributionRequest);
-
-            log.debug("exported packages {}", distributionPackages.size());
-
-            DistributionResponse distributionResponse = scheduleImportPackages(distributionPackages, callingUser);
+            DistributionResponse distributionResponse = exportPackages(agentResourceResolver, distributionRequest, callingUser);
 
             log.info(silent, "returning response {}", distributionResponse);
 
@@ -215,26 +213,19 @@ public class SimpleDistributionAgent imp
         return !queueProcessingEnabled;
     }
 
-    private List<DistributionPackage> exportPackages(ResourceResolver agentResourceResolver, DistributionRequest distributionRequest) throws DistributionException {
-        log.debug("exporting packages with user {}", agentResourceResolver != null ? agentResourceResolver.getUserID() : "dummy");
-
-        List<DistributionPackage> distributionPackages = distributionPackageExporter.exportPackages(agentResourceResolver, distributionRequest);
+    private DistributionResponse exportPackages(ResourceResolver agentResourceResolver, DistributionRequest distributionRequest, String callingUser) throws DistributionException {
+        String actualUser =  agentResourceResolver != null ? agentResourceResolver.getUserID() : "N/A";
+        log.debug("exporting packages with user {} on behalf of {}", actualUser, callingUser);
+        PackageExporterProcessor packageProcessor =  new PackageExporterProcessor(callingUser);
+        distributionPackageExporter.exportPackages(agentResourceResolver, distributionRequest, packageProcessor);
 
         generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_CREATED);
+        List<DistributionResponse> distributionResponses = packageProcessor.getAllResponses();
 
-        return distributionPackages;
-    }
-
-    private DistributionResponse scheduleImportPackages(List<DistributionPackage> distributionPackages, String callingUser) {
-        List<DistributionResponse> distributionResponses = new LinkedList<DistributionResponse>();
-
-        for (DistributionPackage distributionPackage : distributionPackages) {
-            Collection<SimpleDistributionResponse> distributionResponsesForPackage = scheduleImportPackage(distributionPackage, callingUser);
-            distributionResponses.addAll(distributionResponsesForPackage);
-        }
         return distributionResponses.size() == 1 ? distributionResponses.get(0) : new CompositeDistributionResponse(distributionResponses);
     }
 
+
     private Collection<SimpleDistributionResponse> scheduleImportPackage(DistributionPackage distributionPackage, String callingUser) {
         Collection<SimpleDistributionResponse> distributionResponses = new LinkedList<SimpleDistributionResponse>();
 
@@ -581,6 +572,30 @@ public class SimpleDistributionAgent imp
         }
     }
 
+
+    class PackageExporterProcessor implements DistributionPackageProcessor {
+
+        private final String callingUser;
+
+        public List<DistributionResponse> getAllResponses() {
+            return allResponses;
+        }
+
+        private final List<DistributionResponse> allResponses = new ArrayList<DistributionResponse>();
+
+        PackageExporterProcessor(String callingUser) {
+
+            this.callingUser = callingUser;
+        }
+
+        @Override
+        public void process(DistributionPackage distributionPackage) {
+            Collection<SimpleDistributionResponse> responses = scheduleImportPackage(distributionPackage, callingUser);
+
+            allResponses.addAll(responses);
+        }
+    }
+
     public class AgentBasedRequestHandler implements DistributionRequestHandler {
         private final DistributionAgent agent;
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java Tue Dec 22 08:40:03 2015
@@ -53,7 +53,6 @@ import org.apache.sling.distribution.que
 import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
-import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
 import org.apache.sling.distribution.trigger.DistributionTrigger;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.jcr.api.SlingRepository;
@@ -123,12 +122,6 @@ public class SyncDistributionAgentFactor
             "The list can be given as a map in case a queue should be configured for each endpoint, e.g. queueName=http://...")
     public static final String IMPORTER_ENDPOINTS = "packageImporter.endpoints";
 
-
-    @Property(boolValue = false, label = "Use multiple queues", description = "Whether or not to use an individual queue for each importer endpoint. " +
-            "If the queue names are not specified by importer endpoints definition then they are autogenerated.")
-    public static final String USE_MULTIPLE_QUEUES = "useMultipleQueues";
-
-
     @Property(options = {
             @PropertyOption(name = "none", value = "none"), @PropertyOption(name = "errorQueue", value = "errorQueue")},
             value = "none",
@@ -222,7 +215,6 @@ public class SyncDistributionAgentFactor
 
         Map<String, String> importerEndpointsMap = SettingsUtils.toUriMap(importerEndpointsValue);
 
-        boolean useMultipleQueues = PropertiesUtil.toBoolean(config.get(USE_MULTIPLE_QUEUES), false);
         int pullItems = PropertiesUtil.toInteger(config.get(PULL_ITEMS), Integer.MAX_VALUE);
 
 
@@ -231,23 +223,17 @@ public class SyncDistributionAgentFactor
         DistributionPackageImporter packageImporter;
         Set<String> processingQueues = new HashSet<String>();
 
-        if (useMultipleQueues) {
-            Set<String> queuesMap = new TreeSet<String>();
-            queuesMap.addAll(importerEndpointsMap.keySet());
-            queuesMap.addAll(Arrays.asList(passiveQueues));
-            processingQueues.addAll(importerEndpointsMap.keySet());
-            processingQueues.removeAll(Arrays.asList(passiveQueues));
-
-            String[] queueNames = queuesMap.toArray(new String[queuesMap.size()]);
-            exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
-            packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.One);
-        } else {
-            exportQueueStrategy = new SingleQueueDispatchingStrategy();
-            processingQueues.addAll(exportQueueStrategy.getQueueNames());
-            packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.All);
-        }
+        Set<String> queuesMap = new TreeSet<String>();
+        queuesMap.addAll(importerEndpointsMap.keySet());
+        queuesMap.addAll(Arrays.asList(passiveQueues));
+        processingQueues.addAll(importerEndpointsMap.keySet());
+        processingQueues.removeAll(Arrays.asList(passiveQueues));
+
+        String[] queueNames = queuesMap.toArray(new String[queuesMap.size()]);
+        exportQueueStrategy = new MultipleQueueDispatchingStrategy(queueNames);
+        packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap);
 
-        DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints, TransportEndpointStrategyType.All, pullItems);
+        DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints, pullItems);
         DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
         DistributionRequestType[] allowedRequests = new DistributionRequestType[]{DistributionRequestType.PULL};
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageExporter.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageExporter.java Tue Dec 22 08:40:03 2015
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.pa
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.util.List;
 
 import aQute.bnd.annotation.ConsumerType;
@@ -53,7 +54,7 @@ public interface DistributionPackageExpo
      * @return a {@link java.util.List} of {@link DistributionPackage}s
      */
     @Nonnull
-    List<DistributionPackage> exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest) throws DistributionException;
+    void exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest, @Nonnull DistributionPackageProcessor packageProcessor) throws DistributionException;
 
     /**
      * Retrieves a {@link DistributionPackage} given its identifier, if it already exists.

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageProcessor.java?rev=1721316&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageProcessor.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageProcessor.java Tue Dec 22 08:40:03 2015
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.packaging;
+
+
+import org.apache.sling.distribution.serialization.DistributionPackage;
+
+public interface DistributionPackageProcessor {
+
+    void process(DistributionPackage distributionPackage);
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java Tue Dec 22 08:40:03 2015
@@ -21,16 +21,14 @@ package org.apache.sling.distribution.pa
 
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.serialization.DistributionPackageInfo;
-import org.apache.sling.distribution.packaging.SharedDistributionPackage;
+import org.apache.sling.distribution.serialization.impl.SharedDistributionPackage;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 
 /**
@@ -94,6 +92,16 @@ public class DistributionPackageUtils {
             }
         }
     }
+
+    public static void closeSafely(DistributionPackage distributionPackage) {
+        if (distributionPackage != null) {
+            try {
+                distributionPackage.close();
+            } catch (Throwable t) {
+                log.error("error closing package", t);
+            }
+        }
+    }
 
     /**
      * Create a queue item out of a package

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporter.java Tue Dec 22 08:40:03 2015
@@ -29,6 +29,7 @@ import org.apache.sling.distribution.age
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.log.DistributionLog;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.serialization.DistributionPackageInfo;
@@ -66,25 +67,24 @@ public class AgentDistributionPackageExp
     }
 
     @Nonnull
-    public List<DistributionPackage> exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest) throws DistributionException {
-
-        List<DistributionPackage> result = new ArrayList<DistributionPackage>();
+    public void exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest, @Nonnull DistributionPackageProcessor packageProcessor) throws DistributionException {
 
         if (DistributionRequestType.TEST.equals(distributionRequest.getRequestType())) {
-            result.add(new SimpleDistributionPackage(distributionRequest, PACKAGE_TYPE));
-            return result;
+            packageProcessor.process(new SimpleDistributionPackage(distributionRequest, PACKAGE_TYPE));
+            return;
         }
 
         if (!DistributionRequestType.PULL.equals(distributionRequest.getRequestType())) {
             throw new DistributionException("request type not supported " + distributionRequest.getRequestType());
         }
 
+        DistributionPackage distributionPackage = null;
+
         try {
             log.debug("getting packages from queue {}", queueName);
 
             DistributionQueue queue = agent.getQueue(queueName);
             DistributionQueueEntry entry = queue.getHead();
-            DistributionPackage distributionPackage;
             if (entry != null) {
                 DistributionQueueItem queueItem = entry.getItem();
                 DistributionPackageInfo info = DistributionPackageUtils.fromQueueItem(queueItem);
@@ -93,9 +93,9 @@ public class AgentDistributionPackageExp
                 if (packageBuilder != null) {
                     distributionPackage = packageBuilder.getPackage(resourceResolver, queueItem.getId());
 
-                    log.info("item {} fetched from the queue", info);
+                    log.debug("item {} fetched from the queue", info);
                     if (distributionPackage != null) {
-                        result.add(new AgentDistributionPackage(distributionPackage, queue));
+                        packageProcessor.process(new AgentDistributionPackage(distributionPackage, queue));
                     } else {
                         log.warn("cannot get package {}", info);
                     }
@@ -106,9 +106,9 @@ public class AgentDistributionPackageExp
 
         } catch (Exception ex) {
             log.error("Error exporting package", ex);
+        } finally {
+            DistributionPackageUtils.closeSafely(distributionPackage);
         }
-
-        return result;
     }
 
     public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String distributionPackageId) {

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporterFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporterFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/AgentDistributionPackageExporterFactory.java Tue Dec 22 08:40:03 2015
@@ -34,6 +34,7 @@ import org.apache.sling.distribution.Dis
 import org.apache.sling.distribution.agent.DistributionAgent;
 import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
@@ -82,8 +83,8 @@ public class AgentDistributionPackageExp
     }
 
     @Nonnull
-    public List<DistributionPackage> exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest) throws DistributionException {
-        return packageExporter.exportPackages(resourceResolver, distributionRequest);
+    public void exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest, @Nonnull DistributionPackageProcessor packageProcessor) throws DistributionException {
+        packageExporter.exportPackages(resourceResolver, distributionRequest, packageProcessor);
     }
 
     public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String distributionPackageId) throws DistributionException {

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporter.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporter.java Tue Dec 22 08:40:03 2015
@@ -24,7 +24,10 @@ import java.util.List;
 
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
@@ -42,14 +45,16 @@ public class LocalDistributionPackageExp
     }
 
     @Nonnull
-    public List<DistributionPackage> exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest) throws DistributionException {
-        List<DistributionPackage> result = new ArrayList<DistributionPackage>();
+    public void exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest, @Nonnull DistributionPackageProcessor packageProcessor) throws DistributionException {
 
         DistributionPackage createdPackage = packageBuilder.createPackage(resourceResolver, distributionRequest);
 
-        result.add(createdPackage);
+        try {
+            packageProcessor.process(createdPackage);
+        } finally {
+            DistributionPackageUtils.closeSafely(createdPackage);
+        }
 
-        return result;
     }
 
     public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String distributionPackageId) throws DistributionException {

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporterFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporterFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/LocalDistributionPackageExporterFactory.java Tue Dec 22 08:40:03 2015
@@ -32,6 +32,7 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
@@ -71,8 +72,8 @@ public class LocalDistributionPackageExp
     }
 
     @Nonnull
-    public List<DistributionPackage> exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest) throws DistributionException {
-        return exporter.exportPackages(resourceResolver, distributionRequest);
+    public void exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest, @Nonnull DistributionPackageProcessor packageProcessor) throws DistributionException {
+        exporter.exportPackages(resourceResolver, distributionRequest, packageProcessor);
     }
 
     public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String distributionPackageId) throws DistributionException {

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporter.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporter.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporter.java Tue Dec 22 08:40:03 2015
@@ -24,17 +24,20 @@ import java.util.List;
 
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
+import org.apache.sling.distribution.transport.core.DistributionContext;
 import org.apache.sling.distribution.transport.core.DistributionTransport;
 import org.apache.sling.distribution.transport.impl.DistributionEndpoint;
-import org.apache.sling.distribution.transport.impl.MultipleEndpointDistributionTransport;
+import org.apache.sling.distribution.transport.core.DistributionPackageProxy;
 import org.apache.sling.distribution.transport.impl.SimpleHttpDistributionTransport;
-import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
 
 /**
  * Default implementation of {@link org.apache.sling.distribution.packaging.DistributionPackageExporter}
@@ -45,15 +48,16 @@ public class RemoteDistributionPackageEx
     private final DistributionPackageBuilder packageBuilder;
     private final DistributionTransportSecretProvider secretProvider;
     private final DefaultDistributionLog log;
+    private final int maxPullItems;
 
-    private DistributionTransport transportHandler;
+    private List<DistributionTransport> transportHandlers = new ArrayList<DistributionTransport>();
 
     public RemoteDistributionPackageExporter(DefaultDistributionLog log, DistributionPackageBuilder packageBuilder,
                                              DistributionTransportSecretProvider secretProvider,
                                              String[] endpoints,
-                                             TransportEndpointStrategyType transportEndpointStrategyType,
-                                             int pullItems) {
+                                             int maxPullItems) {
         this.log = log;
+        this.maxPullItems = maxPullItems;
         if (packageBuilder == null) {
             throw new IllegalArgumentException("packageBuilder is required");
         }
@@ -66,24 +70,38 @@ public class RemoteDistributionPackageEx
         this.packageBuilder = packageBuilder;
         this.secretProvider = secretProvider;
 
-        List<DistributionTransport> transportHandlers = new ArrayList<DistributionTransport>();
-
         for (String endpoint : endpoints) {
             if (endpoint != null && endpoint.length() > 0) {
-                transportHandlers.add(new SimpleHttpDistributionTransport(log, new DistributionEndpoint(endpoint), packageBuilder, secretProvider, pullItems));
+                transportHandlers.add(new SimpleHttpDistributionTransport(log, new DistributionEndpoint(endpoint), packageBuilder, secretProvider));
             }
         }
-        transportHandler = new MultipleEndpointDistributionTransport(transportHandlers,
-                transportEndpointStrategyType);
     }
 
     @Nonnull
-    public List<DistributionPackage> exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest) throws DistributionException {
-        List<DistributionPackage> packages = new ArrayList<DistributionPackage>();
-        for (DistributionPackage distributionPackage : transportHandler.retrievePackages(resourceResolver, distributionRequest)) {
-            packages.add(distributionPackage);
+    public void exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest, @Nonnull DistributionPackageProcessor packageProcessor) throws DistributionException {
+        int maxNumberOfPackages = DistributionRequestType.PULL.equals(distributionRequest.getRequestType()) ? maxPullItems : 1;
+        for (DistributionTransport distributionTransport : transportHandlers) {
+            int noPackages = 0;
+
+            DistributionContext distributionContext = new DistributionContext();
+            DistributionPackageProxy retrievedPackage;
+            while (noPackages < maxNumberOfPackages && ((retrievedPackage = distributionTransport.retrievePackage(resourceResolver, distributionRequest, distributionContext)) != null)) {
+
+
+                DistributionPackage distributionPackage = retrievedPackage.getPackage();
+
+                try {
+                    packageProcessor.process(distributionPackage);
+
+                    retrievedPackage.deletePackage();
+
+                } finally {
+                    DistributionPackageUtils.closeSafely(distributionPackage);
+                }
+
+                noPackages++;
+            }
         }
-        return packages;
     }
 
     public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String distributionPackageId) throws DistributionException {

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporterFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporterFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/exporter/RemoteDistributionPackageExporterFactory.java Tue Dec 22 08:40:03 2015
@@ -38,11 +38,12 @@ import org.apache.sling.distribution.com
 import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
-import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,21 +75,6 @@ public class RemoteDistributionPackageEx
     public static final String ENDPOINTS = "endpoints";
 
     /**
-     * endpoint strategy property
-     */
-    @Property(options = {
-            @PropertyOption(name = "All",
-                    value = "all endpoints"
-            ),
-            @PropertyOption(name = "One",
-                    value = "one endpoint"
-            )},
-            value = "One",
-            label = "Endpoint Strategy", description = "Specifies whether to export packages from all endpoints or just from one."
-    )
-    public static final String ENDPOINTS_STRATEGY = "endpoints.strategy";
-
-    /**
      * no. of items to poll property
      */
     @Property(label = "Pull Items", description = "number of subsequent pull requests to make", intValue = 1)
@@ -114,10 +100,8 @@ public class RemoteDistributionPackageEx
         String[] endpoints = PropertiesUtil.toStringArray(config.get(ENDPOINTS), new String[0]);
         endpoints = SettingsUtils.removeEmptyEntries(endpoints);
 
-        String endpointStrategyName = PropertiesUtil.toString(config.get(ENDPOINTS_STRATEGY), "One");
         int pollItems = PropertiesUtil.toInteger(config.get(PULL_ITEMS), Integer.MAX_VALUE);
 
-        TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);
 
 
         String exporterName = PropertiesUtil.toString(config.get(NAME), null);
@@ -125,8 +109,7 @@ public class RemoteDistributionPackageEx
         DefaultDistributionLog distributionLog = new DefaultDistributionLog(DistributionComponentKind.EXPORTER, exporterName, RemoteDistributionPackageExporter.class, DefaultDistributionLog.LogLevel.ERROR);
 
 
-        exporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, endpoints,
-                transportEndpointStrategyType, pollItems);
+        exporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, endpoints, pollItems);
     }
 
 
@@ -136,8 +119,8 @@ public class RemoteDistributionPackageEx
     }
 
     @Nonnull
-    public List<DistributionPackage> exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest) throws DistributionException {
-        return exporter.exportPackages(resourceResolver, distributionRequest);
+    public void exportPackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest, @Nonnull DistributionPackageProcessor packageProcessor) throws DistributionException {
+        exporter.exportPackages(resourceResolver, distributionRequest, packageProcessor);
     }
 
     public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String distributionPackageId) throws DistributionException {

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java Tue Dec 22 08:40:03 2015
@@ -26,15 +26,15 @@ import java.util.Map;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
 import org.apache.sling.distribution.serialization.DistributionPackageInfo;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
+import org.apache.sling.distribution.transport.core.DistributionContext;
 import org.apache.sling.distribution.transport.core.DistributionTransport;
 import org.apache.sling.distribution.transport.impl.DistributionEndpoint;
-import org.apache.sling.distribution.transport.impl.MultipleEndpointDistributionTransport;
 import org.apache.sling.distribution.transport.impl.SimpleHttpDistributionTransport;
-import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
 
 /**
  * Remote implementation of {@link org.apache.sling.distribution.packaging.DistributionPackageImporter}
@@ -42,32 +42,38 @@ import org.apache.sling.distribution.tra
 public class RemoteDistributionPackageImporter implements DistributionPackageImporter {
 
 
-    private DistributionTransport transportHandler;
+    Map<String, DistributionTransport> transportHandlers = new HashMap<String, DistributionTransport>();
 
 
     public RemoteDistributionPackageImporter(DefaultDistributionLog log, DistributionTransportSecretProvider distributionTransportSecretProvider,
-                                             Map<String, String> endpointsMap,
-                                             TransportEndpointStrategyType transportEndpointStrategyType) {
+                                             Map<String, String> endpointsMap) {
         if (distributionTransportSecretProvider == null) {
             throw new IllegalArgumentException("distributionTransportSecretProvider is required");
         }
 
-        Map<String, DistributionTransport> transportHandlers = new HashMap<String, DistributionTransport>();
 
         for (Map.Entry<String, String> entry : endpointsMap.entrySet()) {
             String endpointKey = entry.getKey();
             String endpoint = entry.getValue();
             if (endpoint != null && endpoint.length() > 0) {
-                transportHandlers.put(endpointKey, new SimpleHttpDistributionTransport(log, new DistributionEndpoint(endpoint), null, distributionTransportSecretProvider, -1));
+                transportHandlers.put(endpointKey, new SimpleHttpDistributionTransport(log, new DistributionEndpoint(endpoint), null, distributionTransportSecretProvider));
             }
         }
-        transportHandler = new MultipleEndpointDistributionTransport(transportHandlers,
-                transportEndpointStrategyType);
-
     }
 
     public void importPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionPackage distributionPackage) throws DistributionException {
-        transportHandler.deliverPackage(resourceResolver, distributionPackage);
+        DistributionPackageInfo info = distributionPackage.getInfo();
+        String queueName = DistributionPackageUtils.getQueueName(info);
+
+        DistributionTransport distributionTransport = transportHandlers.get(queueName);
+
+        if (distributionTransport != null) {
+            distributionTransport.deliverPackage(resourceResolver, distributionPackage, new DistributionContext());
+        } else {
+            for(DistributionTransport transportHandler: transportHandlers.values()) {
+                transportHandler.deliverPackage(resourceResolver, distributionPackage, new DistributionContext());
+            }
+        }
     }
 
     @Nonnull

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java Tue Dec 22 08:40:03 2015
@@ -40,7 +40,6 @@ import org.apache.sling.distribution.ser
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
 import org.apache.sling.distribution.serialization.DistributionPackageInfo;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
-import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,22 +70,6 @@ public class RemoteDistributionPackageIm
     @Property(cardinality = 100, label = "Endpoints", description = "The list of endpoints to which the packages will be imported.")
     public static final String ENDPOINTS = "endpoints";
 
-    /**
-     * endpoint strategy property
-     */
-    @Property(options = {
-            @PropertyOption(name = "All",
-                    value = "all endpoints"
-            ),
-            @PropertyOption(name = "One",
-                    value = "one endpoint"
-            )},
-            value = "One",
-            label = "Endpoint Strategy", description = "Specifies whether to import packages to all endpoints or just to one."
-    )
-    public static final String ENDPOINTS_STRATEGY = "endpoints.strategy";
-
-
     @Property(name = "transportSecretProvider.target", label = "Transport Secret Provider", description = "The target reference for the DistributionTransportSecretProvider used to obtain the credentials used for accessing the remote endpoints, " +
             "e.g. use target=(name=...) to bind to services by name.")
     @Reference(name = "transportSecretProvider")
@@ -98,16 +81,13 @@ public class RemoteDistributionPackageIm
     protected void activate(Map<String, Object> config) {
 
         Map<String, String> endpoints = SettingsUtils.toUriMap(config.get(ENDPOINTS));
-        String endpointStrategyName = PropertiesUtil.toString(config.get(ENDPOINTS_STRATEGY), "One");
-
-        TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);
 
         String importerName = PropertiesUtil.toString(config.get(NAME), null);
 
         DefaultDistributionLog distributionLog = new DefaultDistributionLog(DistributionComponentKind.IMPORTER, importerName, RemoteDistributionPackageImporter.class, DefaultDistributionLog.LogLevel.ERROR);
 
 
-        importer = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, endpoints, transportEndpointStrategyType);
+        importer = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, endpoints);
 
     }
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java Tue Dec 22 08:40:03 2015
@@ -28,7 +28,7 @@ import java.util.UUID;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.serialization.DistributionPackage;
-import org.apache.sling.distribution.packaging.SharedDistributionPackage;
+import org.apache.sling.distribution.serialization.impl.SharedDistributionPackage;
 import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.queue.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueItem;

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java?rev=1721316&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java Tue Dec 22 08:40:03 2015
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+import org.apache.sling.distribution.serialization.DistributionPackageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultSharedDistributionPackage implements SharedDistributionPackage {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected static final String REFERENCE_ROOT_NODE = "refs";
+    private final Object lock;
+
+    private final ResourceResolver resourceResolver;
+    private final String packagePath;
+    private final DistributionPackage distributionPackage;
+    private final String packageName;
+
+    public DefaultSharedDistributionPackage(Object lock, ResourceResolver resourceResolver, String packageName, String packagePath, DistributionPackage distributionPackage) {
+        this.resourceResolver = resourceResolver;
+        this.packageName = packageName;
+        this.packagePath = packagePath;
+        this.distributionPackage = distributionPackage;
+        this.lock = lock;
+    }
+
+    public void acquire(@Nonnull String holderName) {
+        if (holderName.length() == 0) {
+            throw new IllegalArgumentException("holder name cannot be null or empty");
+        }
+
+        try {
+            createHolderResource(holderName);
+
+            log.debug("acquired package {} for holder {}", new Object[]{packagePath, holderName});
+
+        } catch (PersistenceException e) {
+            log.error("cannot acquire package", e);
+        }
+    }
+
+    public void release(@Nonnull String holderName) {
+
+        if (holderName.length() == 0) {
+            throw new IllegalArgumentException("holder name cannot be null or empty");
+        }
+
+        try {
+            deleteHolderResource(holderName);
+
+            boolean doPackageDelete = deleteIfEmpty();
+
+            if (doPackageDelete) {
+                distributionPackage.delete();
+            }
+
+            log.debug("released package {} from holder {} delete {}", new Object[]{packagePath, holderName, doPackageDelete});
+        } catch (PersistenceException e) {
+            log.error("cannot release package", e);
+        }
+    }
+
+
+    @Nonnull
+    public String getId() {
+        return packageName;
+    }
+
+    @Nonnull
+    public String getType() {
+        return distributionPackage.getType();
+    }
+
+    @Nonnull
+    public InputStream createInputStream() throws IOException {
+        return distributionPackage.createInputStream();
+    }
+
+    public void close() {
+        distributionPackage.close();
+    }
+
+    public void delete() {
+
+        try {
+            deleteHolderRoot();
+        } catch (PersistenceException e) {
+            log.error("cannot delete shared resource", e);
+        }
+
+        distributionPackage.delete();
+    }
+
+    @Nonnull
+    public DistributionPackageInfo getInfo() {
+        return distributionPackage.getInfo();
+    }
+
+    public DistributionPackage getPackage() {
+        return distributionPackage;
+    }
+
+
+    private Resource getProxyResource() {
+        String holderPath = packagePath;
+
+        resourceResolver.refresh();
+        Resource resource = resourceResolver.getResource(holderPath);
+        return resource;
+    }
+
+
+    private Resource getHolderRootResource() {
+        Resource resource = getProxyResource();
+
+        Resource holderRoot = resource.getChild(REFERENCE_ROOT_NODE);
+        if (holderRoot != null) {
+            return holderRoot;
+        }
+
+        return null;
+    }
+
+    private void createHolderResource(String holderName) throws PersistenceException {
+
+        synchronized (lock) {
+            Resource holderRoot = getHolderRootResource();
+
+            if (holderRoot == null) {
+                return;
+            }
+
+            Resource holder = holderRoot.getChild(holderName);
+
+            if (holder != null) {
+                return;
+            }
+
+            resourceResolver.create(holderRoot, holderName, Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "sling:Folder"));
+            resourceResolver.commit();
+
+        }
+    }
+
+    private void deleteHolderResource(String holderName) throws PersistenceException {
+
+        synchronized (lock) {
+            Resource holderRoot = getHolderRootResource();
+
+            if (holderRoot == null) {
+                return;
+            }
+
+            Resource holder = holderRoot.getChild(holderName);
+
+            if (holder == null) {
+                return;
+            }
+
+            resourceResolver.delete(holder);
+            resourceResolver.commit();
+        }
+    }
+
+    private void deleteHolderRoot() throws PersistenceException {
+        synchronized (lock) {
+            Resource resource = getProxyResource();
+            resourceResolver.delete(resource);
+            resourceResolver.commit();
+        }
+
+    }
+
+    private boolean deleteIfEmpty() throws PersistenceException {
+        synchronized (lock) {
+            Resource holderRoot = getHolderRootResource();
+            if (holderRoot != null && !holderRoot.hasChildren()) {
+                deleteHolderRoot();
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+}

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackageBuilder.java?rev=1721316&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackageBuilder.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackageBuilder.java Tue Dec 22 08:40:03 2015
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultSharedDistributionPackageBuilder implements DistributionPackageBuilder {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+    private static final String PN_ORIGINAL_ID = "original.package.id";
+    private static final String PN_ORIGINAL_REQUEST_TYPE = "original.package.request.type";
+    private static final String PN_ORIGINAL_PATHS = "original.package.paths";
+
+    private static final String PACKAGE_NAME_PREFIX = "distrpackage";
+    private final String sharedPackagesRoot;
+    private final String type;
+
+    private final DistributionPackageBuilder distributionPackageBuilder;
+
+    // use a global repolock for syncing access to the shared package root
+    // TODO: this can be finegrained when we will allow configurable package roots
+    private final Object repolock = new Object();
+
+    public DefaultSharedDistributionPackageBuilder(DistributionPackageBuilder distributionPackageExporter) {
+        this.distributionPackageBuilder = distributionPackageExporter;
+        this.type = distributionPackageBuilder.getType();
+        this.sharedPackagesRoot = AbstractDistributionPackage.PACKAGES_ROOT + "/" + type + "/shared";
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    @Nonnull
+    public DistributionPackage createPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest request) throws DistributionException {
+        DistributionPackage distributionPackage = distributionPackageBuilder.createPackage(resourceResolver, request);
+
+        String packageName = null;
+        log.info("mydebug1 create {}", distributionPackage.getId());
+
+        try {
+            packageName = generateNameFromId(resourceResolver, distributionPackage);
+
+        } catch (PersistenceException e) {
+            DistributionPackageUtils.deleteSafely(distributionPackage);
+            throw new DistributionException(e);
+        }
+
+        String packagePath = getPathFromName(packageName);
+        DistributionPackage sharedDistributionPackage = new DefaultSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
+
+        log.info("mydebug2 created shared package {} for {}", sharedDistributionPackage.getId(), distributionPackage.getId());
+        return sharedDistributionPackage;
+
+    }
+
+    @Nonnull
+    @CheckForNull
+    public DistributionPackage readPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull InputStream stream) throws DistributionException {
+        DistributionPackage distributionPackage = distributionPackageBuilder.readPackage(resourceResolver, stream);
+
+        log.info("mydebug4 read shared package {}", distributionPackage);
+
+        if (distributionPackage == null) {
+            return null;
+        }
+
+        String packageName = null;
+        try {
+            packageName = generateNameFromId(resourceResolver, distributionPackage);
+
+        } catch (PersistenceException e) {
+            DistributionPackageUtils.deleteSafely(distributionPackage);
+            throw new DistributionException(e);
+        }
+
+        String packagePath = getPathFromName(packageName);
+
+        DistributionPackage sharedDistributionPackage = new DefaultSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
+
+        log.info("mydebug3 created shared package {} for {}", sharedDistributionPackage.getId(), distributionPackage.getId());
+        return sharedDistributionPackage;
+    }
+
+    @CheckForNull
+    public DistributionPackage getPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull String distributionPackageId) throws DistributionException {
+        String packageName = distributionPackageId;
+        String originalPackageId = retrieveIdFromName(resourceResolver, packageName);
+
+        log.info("mydebug {} {}", packageName, originalPackageId);
+
+        if (originalPackageId == null) {
+            return null;
+        }
+
+        DistributionPackage distributionPackage = distributionPackageBuilder.getPackage(resourceResolver, originalPackageId);
+
+        log.info("mydebug2 {}", distributionPackage);
+
+        if (distributionPackage == null) {
+            return null;
+        }
+
+        String packagePath = getPathFromName(packageName);
+
+        return new DefaultSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
+    }
+
+    public boolean installPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionPackage distributionPackage) throws DistributionException {
+        if (!(distributionPackage instanceof DefaultSharedDistributionPackage)) {
+            return false;
+        }
+
+        DefaultSharedDistributionPackage sharedistributionPackage = (DefaultSharedDistributionPackage) distributionPackage;
+
+        DistributionPackage originalPackage = sharedistributionPackage.getPackage();
+        return distributionPackageBuilder.installPackage(resourceResolver, originalPackage);
+    }
+
+
+    private String generateNameFromId(ResourceResolver resourceResolver, DistributionPackage distributionPackage) throws PersistenceException {
+
+        String name = PACKAGE_NAME_PREFIX + "_" + System.currentTimeMillis() + "_" + UUID.randomUUID();
+
+        Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put(PN_ORIGINAL_ID, distributionPackage.getId());
+
+        // save the info just for debugging purposes
+        if (distributionPackage.getInfo().getRequestType() != null) {
+            properties.put(PN_ORIGINAL_REQUEST_TYPE, distributionPackage.getInfo().getRequestType().toString());
+
+        }
+        if (distributionPackage.getInfo().getPaths() != null) {
+            properties.put(PN_ORIGINAL_PATHS, distributionPackage.getInfo().getPaths());
+        }
+
+        String packagePath = getPathFromName(name);
+
+
+        synchronized (repolock) {
+            Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath,
+                    "sling:Folder", "sling:Folder", false);
+
+            ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
+            valueMap.putAll(properties);
+
+            resourceResolver.create(resource, DefaultSharedDistributionPackage.REFERENCE_ROOT_NODE,
+                    Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "sling:Folder"));
+
+            resourceResolver.commit();
+        }
+
+        return name;
+    }
+
+    private String getPathFromName(String name) {
+        String packagePath = sharedPackagesRoot + "/" + name;
+        return packagePath;
+    }
+
+    private String retrieveIdFromName(ResourceResolver resourceResolver, String name) {
+        String packagePath = getPathFromName(name);
+
+        Resource resource = resourceResolver.getResource(packagePath);
+
+        if (resource == null) {
+            return null;
+        }
+
+        ValueMap properties = resource.adaptTo(ValueMap.class);
+
+        if (properties == null) {
+            return null;
+        }
+
+
+        return properties.get(PN_ORIGINAL_ID, null);
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageWrapper.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageWrapper.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageWrapper.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageWrapper.java Tue Dec 22 08:40:03 2015
@@ -32,38 +32,44 @@ import org.apache.sling.distribution.ser
  */
 public class DistributionPackageWrapper implements DistributionPackage {
 
-    protected final DistributionPackage distributionPackage;
 
-    protected DistributionPackageWrapper(DistributionPackage distributionPackage) {
+    protected final DistributionPackage wrappedPackage;
 
-        this.distributionPackage = distributionPackage;
+    protected DistributionPackageWrapper(DistributionPackage wrappedPackage) {
+
+        this.wrappedPackage = wrappedPackage;
     }
 
     @Nonnull
     public String getId() {
-        return distributionPackage.getId();
+        return wrappedPackage.getId();
     }
 
     @Nonnull
     public String getType() {
-        return distributionPackage.getId();
+        return wrappedPackage.getType();
     }
 
     @Nonnull
     public InputStream createInputStream() throws IOException {
-        return distributionPackage.createInputStream();
+        return wrappedPackage.createInputStream();
     }
 
     public void close() {
-        distributionPackage.close();
+        wrappedPackage.close();
     }
 
     public void delete() {
-        distributionPackage.delete();
+        wrappedPackage.delete();
     }
 
     @Nonnull
     public DistributionPackageInfo getInfo() {
-        return distributionPackage.getInfo();
+        return wrappedPackage.getInfo();
+    }
+
+    public DistributionPackage getWrappedPackage() {
+        return wrappedPackage;
     }
+
 }

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java?rev=1721316&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java Tue Dec 22 08:40:03 2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.serialization.impl;
+
+import javax.annotation.Nonnull;
+
+import aQute.bnd.annotation.ConsumerType;
+import aQute.bnd.annotation.ProviderType;
+import org.apache.sling.distribution.serialization.DistributionPackage;
+
+/**
+ * A {@link DistributionPackage} that offers basic reference counting
+ */
+@ProviderType
+public interface SharedDistributionPackage extends DistributionPackage {
+
+    /**
+     * acquire a reference to this package and increase the reference count.
+     */
+    void acquire(@Nonnull String holderName);
+
+    /**
+     * release a reference to this package and decrease the reference count.
+     * when no more references are hold the package {@code DistributionPackage#delete} method is called.
+     */
+    void release(@Nonnull String holderName);
+
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java?rev=1721316&r1=1721315&r2=1721316&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java Tue Dec 22 08:40:03 2015
@@ -41,7 +41,7 @@ import org.apache.sling.distribution.com
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
-import org.apache.sling.distribution.serialization.impl.ResourceSharedDistributionPackageBuilder;
+import org.apache.sling.distribution.serialization.impl.DefaultSharedDistributionPackageBuilder;
 
 /**
  * A package builder for Apache Jackrabbit FileVault based implementations.
@@ -138,9 +138,9 @@ public class VaultDistributionPackageBui
         }
 
         if ("filevlt".equals(type)) {
-            packageBuilder = new ResourceSharedDistributionPackageBuilder(new FileVaultDistributionPackageBuilder(name, packaging, importMode, aclHandling, packageRoots, packageFilters, tempFsFolder));
+            packageBuilder = new DefaultSharedDistributionPackageBuilder(new FileVaultDistributionPackageBuilder(name, packaging, importMode, aclHandling, packageRoots, packageFilters, tempFsFolder));
         } else {
-            packageBuilder = new ResourceSharedDistributionPackageBuilder(new JcrVaultDistributionPackageBuilder(name, packaging, importMode, aclHandling, packageRoots, packageFilters, tempFsFolder));
+            packageBuilder = new DefaultSharedDistributionPackageBuilder(new JcrVaultDistributionPackageBuilder(name, packaging, importMode, aclHandling, packageRoots, packageFilters, tempFsFolder));
         }
     }