You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2015/01/05 11:40:34 UTC

svn commit: r1649499 - in /sling/trunk/contrib/extensions/distribution: core/src/main/java/org/apache/sling/distribution/agent/impl/ core/src/main/java/org/apache/sling/distribution/component/impl/ core/src/main/java/org/apache/sling/distribution/packa...

Author: mpetria
Date: Mon Jan  5 10:40:33 2015
New Revision: 1649499

URL: http://svn.apache.org/r1649499
Log:
SLING-4009: implementing multiple queue dispatch strategy to have one queue per receiving endpoint

Added:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/AbstractDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
    sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/MultipleForwardDistributionTest.java
    sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-multiple.json
    sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/forward.json
    sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/reverse.json
Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/DistributionComponentUtils.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/SettingsUtils.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageInfo.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SingleQueueDispatchingStrategy.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/MultipleEndpointDistributionTransport.java
    sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterTest.java
    sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionIntegrationTestBase.java
    sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java
    sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-reverse.json
    sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish.json
    sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.publish/agents/reverse.json

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/AbstractDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/AbstractDistributionAgentFactory.java?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/AbstractDistributionAgentFactory.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/AbstractDistributionAgentFactory.java Mon Jan  5 10:40:33 2015
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.agent.DistributionAgent;
+import org.apache.sling.distribution.component.impl.DistributionComponentUtils;
+import org.apache.sling.distribution.event.impl.DistributionEventFactory;
+import org.apache.sling.distribution.packaging.DistributionPackageExporter;
+import org.apache.sling.distribution.packaging.DistributionPackageImporter;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
+import org.apache.sling.distribution.resources.DistributionConstants;
+import org.apache.sling.distribution.resources.impl.OsgiUtils;
+import org.apache.sling.distribution.trigger.DistributionTrigger;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * An abstract OSGi service factory for registering {@link org.apache.sling.distribution.agent.impl.SimpleDistributionAgent}s
+ */
+public abstract class AbstractDistributionAgentFactory {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    public static final String NAME = DistributionComponentUtils.PN_NAME;
+
+    private static final String ENABLED = "enabled";
+
+    private ServiceRegistration componentReg;
+    private BundleContext savedContext;
+    private Map<String, Object> savedConfig;
+    private String agentName;
+    List<DistributionTrigger> triggers = new CopyOnWriteArrayList<DistributionTrigger>();
+
+    private SimpleDistributionAgent agent;
+
+    protected void activate(BundleContext context, Map<String, Object> config) {
+        log.info("activating with config {}", OsgiUtils.osgiPropertyMapToString(config));
+
+
+        savedContext = context;
+        savedConfig = config;
+
+        // inject configuration
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+
+        boolean enabled = PropertiesUtil.toBoolean(config.get(ENABLED), true);
+
+        if (enabled) {
+            props.put(ENABLED, true);
+
+            agentName = PropertiesUtil.toString(config.get(NAME), null);
+            props.put(NAME, agentName);
+            props.put(DistributionConstants.PN_IS_RESOURCE, config.get(DistributionConstants.PN_IS_RESOURCE));
+
+            if (componentReg == null) {
+
+                try {
+
+                   agent = createAgent(agentName, context, config);
+                }
+                catch (IllegalArgumentException e) {
+                    log.warn("cannot create agent", e);
+                }
+
+
+                if (agent != null) {
+
+                    // register agent service
+                    componentReg = context.registerService(DistributionAgent.class.getName(), agent, props);
+                    agent.enable();
+                }
+
+                log.info("activated agent {}", agentName);
+
+            }
+        }
+    }
+
+    protected void bindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        triggers.add(distributionTrigger);
+        if (agent != null) {
+            agent.enableTrigger(distributionTrigger);
+        }
+
+    }
+
+    protected void unbindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        triggers.remove(distributionTrigger);
+
+        if (agent != null) {
+            agent.disableTrigger(distributionTrigger);
+        }
+    }
+
+    protected void deactivate(BundleContext context) {
+        if (componentReg != null) {
+            ServiceReference reference = componentReg.getReference();
+            Object service = context.getService(reference);
+            if (service instanceof SimpleDistributionAgent) {
+                ((SimpleDistributionAgent) service).disable();
+
+            }
+
+            componentReg.unregister();
+            componentReg = null;
+            agent = null;
+        }
+
+        log.info("deactivated agent {}", agentName);
+
+
+    }
+
+
+    protected abstract SimpleDistributionAgent createAgent(String agentName, BundleContext context, Map<String, Object> config);
+
+}

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Mon Jan  5 10:40:33 2015
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.agent.DistributionAgent;
+import org.apache.sling.distribution.component.impl.DistributionComponentUtils;
+import org.apache.sling.distribution.component.impl.SettingsUtils;
+import org.apache.sling.distribution.event.impl.DistributionEventFactory;
+import org.apache.sling.distribution.packaging.DistributionPackageExporter;
+import org.apache.sling.distribution.packaging.DistributionPackageImporter;
+import org.apache.sling.distribution.packaging.impl.exporter.LocalDistributionPackageExporter;
+import org.apache.sling.distribution.packaging.impl.exporter.RemoteDistributionPackageExporter;
+import org.apache.sling.distribution.packaging.impl.importer.RemoteDistributionPackageImporter;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
+import org.apache.sling.distribution.resources.DistributionConstants;
+import org.apache.sling.distribution.resources.impl.OsgiUtils;
+import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
+import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
+import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
+import org.apache.sling.distribution.trigger.DistributionTrigger;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * An OSGi service factory for {@link org.apache.sling.distribution.agent.DistributionAgent}s which references already existing OSGi services.
+ */
+@Component(metatype = true,
+        label = "Sling Distribution - Forward Agents Factory",
+        description = "OSGi configuration factory for forward agents",
+        configurationFactory = true,
+        specVersion = "1.1",
+        policy = ConfigurationPolicy.REQUIRE
+)
+@Reference(name = "triggers", referenceInterface = DistributionTrigger.class,
+        policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+        bind = "bindDistributionTrigger", unbind = "unbindDistributionTrigger")
+public class ForwardDistributionAgentFactory extends AbstractDistributionAgentFactory {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Property(label = "Name")
+    public static final String NAME = DistributionComponentUtils.PN_NAME;
+
+    @Property(boolValue = true, label = "Enabled")
+    private static final String ENABLED = "enabled";
+
+
+    @Property(label = "Service Name")
+    public static final String SERVICE_NAME = "serviceName";
+
+    /**
+     * endpoints property
+     */
+    @Property(cardinality = -1)
+    public static final String IMPORTER_ENDPOINTS = "packageImporter.endpoints";
+
+    @Reference
+    private Packaging packaging;
+
+    @Property(name = "requestAuthorizationStrategy.target")
+    @Reference(name = "requestAuthorizationStrategy")
+    private DistributionRequestAuthorizationStrategy requestAuthorizationStrategy;
+
+
+    @Property(name = "transportSecretProvider.target")
+    @Reference(name = "transportSecretProvider")
+    DistributionTransportSecretProvider transportSecretProvider;
+
+
+    @Property(name = "packageBuilder.target")
+    @Reference(name = "packageBuilder")
+    private DistributionPackageBuilder packageBuilder;
+
+    @Reference
+    private DistributionEventFactory distributionEventFactory;
+
+    @Reference
+    private SlingSettingsService settingsService;
+
+    @Reference
+    private JobManager jobManager;
+
+    @Reference
+    private ResourceResolverFactory resourceResolverFactory;
+
+
+    @Activate
+    protected void activate(BundleContext context, Map<String, Object> config) {
+        super.activate(context, config);
+    }
+
+    protected void bindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        super.bindDistributionTrigger(distributionTrigger, config);
+
+    }
+
+    protected void unbindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        super.unbindDistributionTrigger(distributionTrigger, config);
+    }
+
+    @Deactivate
+    protected void deactivate(BundleContext context) {
+        super.deactivate(context);
+    }
+
+    @Override
+    protected SimpleDistributionAgent createAgent(String agentName, BundleContext context, Map<String, Object> config) {
+        String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
+
+        Map<String, String> importerEndpointsMap = SettingsUtils.toUriMap(config.get(IMPORTER_ENDPOINTS));
+
+
+        DistributionPackageExporter packageExporter = new LocalDistributionPackageExporter(packageBuilder);
+        DistributionPackageImporter packageImporter = new RemoteDistributionPackageImporter(transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.One);
+        DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+
+        String[] queueNames = importerEndpointsMap.keySet().toArray(new String[0]);
+        DistributionQueueDispatchingStrategy dispatchingStrategy = new MultipleQueueDispatchingStrategy(queueNames);
+
+        return new SimpleDistributionAgent(agentName, false, serviceName,
+                packageImporter, packageExporter, requestAuthorizationStrategy,
+                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, triggers);
+
+
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java Mon Jan  5 10:40:33 2015
@@ -78,7 +78,7 @@ import java.util.concurrent.CopyOnWriteA
 @Reference(name = "triggers", referenceInterface = DistributionTrigger.class,
         policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
         bind = "bindDistributionTrigger", unbind = "unbindDistributionTrigger")
-public class QueueDistributionAgentFactory {
+public class QueueDistributionAgentFactory extends AbstractDistributionAgentFactory {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Property(label = "Name")
@@ -116,91 +116,34 @@ public class QueueDistributionAgentFacto
     @Reference
     private ResourceResolverFactory resourceResolverFactory;
 
-    private ServiceRegistration componentReg;
-    private BundleContext savedContext;
-    private Map<String, Object> savedConfig;
-    private String agentName;
-    List<DistributionTrigger> triggers = new CopyOnWriteArrayList<DistributionTrigger>();
-
-    private SimpleDistributionAgent agent;
-
     @Activate
     protected void activate(BundleContext context, Map<String, Object> config) {
-        log.info("activating with config {}", OsgiUtils.osgiPropertyMapToString(config));
-
-
-        savedContext = context;
-        savedConfig = config;
-
-        // inject configuration
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-
-        boolean enabled = PropertiesUtil.toBoolean(config.get(ENABLED), true);
-
-        if (enabled) {
-            props.put(ENABLED, true);
-
-            agentName = PropertiesUtil.toString(config.get(NAME), null);
-            props.put(NAME, agentName);
-            props.put(DistributionConstants.PN_IS_RESOURCE, config.get(DistributionConstants.PN_IS_RESOURCE));
-
-            if (componentReg == null) {
-
-                String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
-
-                try {
-                    DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, savedContext);
-                    DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
-
-                    agent = new SimpleDistributionAgent(agentName, true, serviceName,
-                            null, packageExporter, requestAuthorizationStrategy,
-                            queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, triggers);
-                }
-                catch (IllegalArgumentException e) {
-                    log.warn("cannot create agent", e);
-                }
-
-                log.debug("activated agent {}", agentName);
-
-                if (agent != null) {
-
-                    // register agent service
-                    componentReg = context.registerService(DistributionAgent.class.getName(), agent, props);
-                    agent.enable();
-                }
-            }
-        }
+        super.activate(context, config);
     }
 
-    private void bindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
-        triggers.add(distributionTrigger);
-        if (agent != null) {
-            agent.enableTrigger(distributionTrigger);
-        }
+    protected void bindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        super.bindDistributionTrigger(distributionTrigger, config);
 
     }
 
-    private void unbindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
-        triggers.remove(distributionTrigger);
-
-        if (agent != null) {
-            agent.disableTrigger(distributionTrigger);
-        }
+    protected void unbindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        super.unbindDistributionTrigger(distributionTrigger, config);
     }
 
     @Deactivate
     protected void deactivate(BundleContext context) {
-        if (componentReg != null) {
-            ServiceReference reference = componentReg.getReference();
-            Object service = context.getService(reference);
-            if (service instanceof SimpleDistributionAgent) {
-                ((SimpleDistributionAgent) service).disable();
-            }
-
-            componentReg.unregister();
-            componentReg = null;
-            agent = null;
-        }
+        super.deactivate(context);
+    }
+
+    @Override
+    protected SimpleDistributionAgent createAgent(String agentName, BundleContext context, Map<String, Object> config) {
 
+        String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
+        DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+        DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
+
+        return new SimpleDistributionAgent(agentName, true, serviceName,
+                null, packageExporter, requestAuthorizationStrategy,
+                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, triggers);
     }
 }

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java Mon Jan  5 10:40:33 2015
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.agent.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.component.impl.DistributionComponentUtils;
+import org.apache.sling.distribution.component.impl.SettingsUtils;
+import org.apache.sling.distribution.event.impl.DistributionEventFactory;
+import org.apache.sling.distribution.packaging.DistributionPackageExporter;
+import org.apache.sling.distribution.packaging.DistributionPackageImporter;
+import org.apache.sling.distribution.packaging.impl.exporter.LocalDistributionPackageExporter;
+import org.apache.sling.distribution.packaging.impl.exporter.RemoteDistributionPackageExporter;
+import org.apache.sling.distribution.packaging.impl.importer.LocalDistributionPackageImporter;
+import org.apache.sling.distribution.packaging.impl.importer.RemoteDistributionPackageImporter;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
+import org.apache.sling.distribution.serialization.DistributionPackageBuilder;
+import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
+import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
+import org.apache.sling.distribution.trigger.DistributionTrigger;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * An OSGi service factory for {@link org.apache.sling.distribution.agent.DistributionAgent}s which references already existing OSGi services.
+ */
+@Component(metatype = true,
+        label = "Sling Distribution - Reverse Agents Factory",
+        description = "OSGi configuration factory for reverse agents",
+        configurationFactory = true,
+        specVersion = "1.1",
+        policy = ConfigurationPolicy.REQUIRE
+)
+@Reference(name = "triggers", referenceInterface = DistributionTrigger.class,
+        policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+        bind = "bindDistributionTrigger", unbind = "unbindDistributionTrigger")
+public class ReverseDistributionAgentFactory extends AbstractDistributionAgentFactory {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Property(label = "Name")
+    public static final String NAME = DistributionComponentUtils.PN_NAME;
+
+    @Property(boolValue = true, label = "Enabled")
+    private static final String ENABLED = "enabled";
+
+
+    @Property(label = "Service Name")
+    public static final String SERVICE_NAME = "serviceName";
+
+    /**
+     * endpoints property
+     */
+    @Property(cardinality = -1)
+    public static final String EXPORTER_ENDPOINTS = "packageExporter.endpoints";
+
+
+    /**
+     * no. of items to poll property
+     */
+    @Property(name = "pull items", description = "number of subsequent pull requests to make", intValue = 1)
+    public static final String PULL_ITEMS = "pull.items";
+
+    @Reference
+    private Packaging packaging;
+
+    @Property(name = "requestAuthorizationStrategy.target")
+    @Reference(name = "requestAuthorizationStrategy")
+    private DistributionRequestAuthorizationStrategy requestAuthorizationStrategy;
+
+
+    @Property(name = "transportSecretProvider.target")
+    @Reference(name = "transportSecretProvider")
+    DistributionTransportSecretProvider transportSecretProvider;
+
+
+    @Property(name = "packageBuilder.target")
+    @Reference(name = "packageBuilder")
+    private DistributionPackageBuilder packageBuilder;
+
+    @Reference
+    private DistributionEventFactory distributionEventFactory;
+
+    @Reference
+    private SlingSettingsService settingsService;
+
+    @Reference
+    private JobManager jobManager;
+
+    @Reference
+    private ResourceResolverFactory resourceResolverFactory;
+
+
+    @Activate
+    protected void activate(BundleContext context, Map<String, Object> config) {
+        super.activate(context, config);
+    }
+
+    protected void bindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        super.bindDistributionTrigger(distributionTrigger, config);
+
+    }
+
+    protected void unbindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        super.unbindDistributionTrigger(distributionTrigger, config);
+    }
+
+    @Deactivate
+    protected void deactivate(BundleContext context) {
+        super.deactivate(context);
+    }
+
+    @Override
+    protected SimpleDistributionAgent createAgent(String agentName, BundleContext context, Map<String, Object> config) {
+        String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
+
+        String[] exporterEndpoints = PropertiesUtil.toStringArray(config.get(EXPORTER_ENDPOINTS), new String[0]);
+
+        int pollItems = PropertiesUtil.toInteger(config.get(PULL_ITEMS), Integer.MAX_VALUE);
+
+
+        DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(packageBuilder, transportSecretProvider, exporterEndpoints,
+                TransportEndpointStrategyType.All, pollItems);
+        DistributionPackageImporter packageImporter = new LocalDistributionPackageImporter(packageBuilder);
+        DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+
+        DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
+
+        return new SimpleDistributionAgent(agentName, false, serviceName,
+                packageImporter, packageExporter, requestAuthorizationStrategy,
+                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, triggers);
+
+
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Mon Jan  5 10:40:33 2015
@@ -343,6 +343,7 @@ public class SimpleDistributionAgent imp
 
             if (distributionPackage != null) {
                 distributionPackage.getInfo().fillInfo(queueItem.getPackageInfo());
+                distributionPackage.getInfo().setQueue(queueName);
 
                 distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage);
                 distributionEventFactory.generateAgentPackageEvent(DistributionEventType.AGENT_PACKAGE_DISTRIBUTED, name, distributionPackage.getInfo());

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentFactory.java Mon Jan  5 10:40:33 2015
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
 @Reference(name = "triggers", referenceInterface = DistributionTrigger.class,
         policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
         bind = "bindDistributionTrigger", unbind = "unbindDistributionTrigger")
-public class SimpleDistributionAgentFactory {
+public class SimpleDistributionAgentFactory extends AbstractDistributionAgentFactory {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Property(label = "Name")
@@ -109,99 +109,38 @@ public class SimpleDistributionAgentFact
     @Reference
     private ResourceResolverFactory resourceResolverFactory;
 
-    private ServiceRegistration componentReg;
-    private BundleContext savedContext;
-    private Map<String, Object> savedConfig;
-    private String agentName;
-    List<DistributionTrigger> triggers = new CopyOnWriteArrayList<DistributionTrigger>();
-
     private SimpleDistributionAgent agent;
 
     @Activate
     protected void activate(BundleContext context, Map<String, Object> config) {
-        log.info("activating with config {}", OsgiUtils.osgiPropertyMapToString(config));
-
-
-        savedContext = context;
-        savedConfig = config;
-
-        // inject configuration
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-
-        boolean enabled = PropertiesUtil.toBoolean(config.get(ENABLED), true);
-
-        if (enabled) {
-            props.put(ENABLED, true);
-
-            agentName = PropertiesUtil.toString(config.get(NAME), null);
-            props.put(NAME, agentName);
-            props.put(DistributionConstants.PN_IS_RESOURCE, config.get(DistributionConstants.PN_IS_RESOURCE));
-
-            if (componentReg == null) {
-
-                String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
-
-                boolean isPassive = PropertiesUtil.toBoolean(config.get(IS_PASSIVE), false);
-
-                try {
-
-                    DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, savedContext);
-                    DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
-                    agent = new SimpleDistributionAgent(agentName, isPassive, serviceName,
-                            packageImporter, packageExporter, requestAuthorizationStrategy,
-                            queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, triggers);
-                }
-                catch (IllegalArgumentException e) {
-                    log.warn("cannot create agent", e);
-                }
-
-
-                if (agent != null) {
-
-                    // register agent service
-                    componentReg = context.registerService(DistributionAgent.class.getName(), agent, props);
-                    agent.enable();
-                }
-
-                log.info("activated agent {}", agentName);
-
-            }
-        }
+        super.activate(context, config);
     }
 
-    private void bindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
-        triggers.add(distributionTrigger);
-        if (agent != null) {
-            agent.enableTrigger(distributionTrigger);
-        }
+    protected void bindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+       super.bindDistributionTrigger(distributionTrigger, config);
 
     }
 
-    private void unbindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
-        triggers.remove(distributionTrigger);
-
-        if (agent != null) {
-            agent.disableTrigger(distributionTrigger);
-        }
+    protected void unbindDistributionTrigger(DistributionTrigger distributionTrigger, Map<String, Object> config) {
+        super.unbindDistributionTrigger(distributionTrigger, config);
     }
 
     @Deactivate
     protected void deactivate(BundleContext context) {
-        if (componentReg != null) {
-            ServiceReference reference = componentReg.getReference();
-            Object service = context.getService(reference);
-            if (service instanceof SimpleDistributionAgent) {
-                ((SimpleDistributionAgent) service).disable();
-
-            }
-
-            componentReg.unregister();
-            componentReg = null;
-            agent = null;
-        }
-
-        log.info("deactivated agent {}", agentName);
+        super.deactivate(context);
+    }
 
+    @Override
+    protected SimpleDistributionAgent createAgent(String agentName, BundleContext context, Map<String, Object> config) {
+        String serviceName = PropertiesUtil.toString(config.get(SERVICE_NAME), null);
+
+        boolean isPassive = PropertiesUtil.toBoolean(config.get(IS_PASSIVE), false);
+
+        DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+        DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
+        return new SimpleDistributionAgent(agentName, isPassive, serviceName,
+                packageImporter, packageExporter, requestAuthorizationStrategy,
+                queueProvider, dispatchingStrategy, distributionEventFactory, resourceResolverFactory, triggers);
 
     }
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java Mon Jan  5 10:40:33 2015
@@ -33,6 +33,7 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.agent.DistributionAgent;
 import org.apache.sling.distribution.component.impl.DistributionComponentUtils;
+import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.event.impl.DistributionEventFactory;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
@@ -170,10 +171,11 @@ public class SyncDistributionAgentFactor
 
 
                     String[] exporterEndpoints = PropertiesUtil.toStringArray(config.get(EXPORTER_ENDPOINTS), new String[0]);
-                    String[] importerEndpoints = PropertiesUtil.toStringArray(config.get(IMPORTER_ENDPOINTS), new String[0]);
+                    Map<String, String> importerEndpointsMap = SettingsUtils.toUriMap(config.get(IMPORTER_ENDPOINTS));
+
 
                     DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(packageBuilder, transportSecretProvider, exporterEndpoints, TransportEndpointStrategyType.All, 1);
-                    DistributionPackageImporter packageImporter = new RemoteDistributionPackageImporter(transportSecretProvider, importerEndpoints, TransportEndpointStrategyType.All);
+                    DistributionPackageImporter packageImporter = new RemoteDistributionPackageImporter(transportSecretProvider, importerEndpointsMap, TransportEndpointStrategyType.All);
                     DistributionQueueProvider queueProvider =  new JobHandlingDistributionQueueProvider(agentName, jobManager, savedContext);
                     DistributionQueueDispatchingStrategy dispatchingStrategy = new SingleQueueDispatchingStrategy();
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/DistributionComponentUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/DistributionComponentUtils.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/DistributionComponentUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/DistributionComponentUtils.java Mon Jan  5 10:40:33 2015
@@ -27,8 +27,10 @@ import java.util.Map;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.agent.DistributionAgent;
 import org.apache.sling.distribution.agent.impl.DistributionRequestAuthorizationStrategy;
+import org.apache.sling.distribution.agent.impl.ForwardDistributionAgentFactory;
 import org.apache.sling.distribution.agent.impl.PrivilegeDistributionRequestAuthorizationStrategy;
 import org.apache.sling.distribution.agent.impl.QueueDistributionAgentFactory;
+import org.apache.sling.distribution.agent.impl.ReverseDistributionAgentFactory;
 import org.apache.sling.distribution.agent.impl.SimpleDistributionAgentFactory;
 import org.apache.sling.distribution.agent.impl.SyncDistributionAgentFactory;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
@@ -108,6 +110,8 @@ public class DistributionComponentUtils
         // register "core" factories kind, type -> ComponentFactoryClass
         registerFactory("agent", "simple", SimpleDistributionAgentFactory.class);
         registerFactory("agent", "sync", SyncDistributionAgentFactory.class);
+        registerFactory("agent", "forward", ForwardDistributionAgentFactory.class);
+        registerFactory("agent", "reverse", ReverseDistributionAgentFactory.class);
         registerFactory("agent", "queue", QueueDistributionAgentFactory.class);
 
         registerFactory("exporter", "local", LocalDistributionPackageExporterFactory.class);

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/SettingsUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/SettingsUtils.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/SettingsUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/component/impl/SettingsUtils.java Mon Jan  5 10:40:33 2015
@@ -19,11 +19,14 @@
 package org.apache.sling.distribution.component.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.transport.core.DistributionTransport;
 
 //TODO: Consider removing it
 /**
@@ -155,4 +158,25 @@ public class SettingsUtils {
             return false;
         }
     }
+
+
+    public static <AType> Map<String, AType> toMap(List<AType> aList, String prefix) {
+        Map<String, AType> result = new TreeMap<String, AType>();
+        for (int i=0; i<aList.size(); i++) {
+            result.put(prefix+i, aList.get(i));
+        }
+
+        return result;
+    }
+
+
+    public static Map<String, String> toUriMap(Object obj) {
+        Map<String, String> uriMap = PropertiesUtil.toMap(obj, new String[0]);
+
+        if (uriMap.size() == 0) {
+            String[] endpoints = PropertiesUtil.toStringArray(obj, new String[0]);
+            uriMap = toMap(Arrays.asList(endpoints), "endpoint");
+        }
+        return uriMap;
+    }
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageInfo.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageInfo.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/DistributionPackageInfo.java Mon Jan  5 10:40:33 2015
@@ -31,6 +31,7 @@ import org.apache.sling.distribution.Dis
 public final class DistributionPackageInfo {
 
     private URI origin;
+    private String queue;
     private DistributionRequestType requestType;
     private String[] paths;
 
@@ -112,4 +113,12 @@ public final class DistributionPackageIn
                 ", paths=" + Arrays.toString(paths) +
                 '}';
     }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporter.java Mon Jan  5 10:40:33 2015
@@ -21,7 +21,9 @@ package org.apache.sling.distribution.pa
 import javax.annotation.Nonnull;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.packaging.DistributionPackage;
@@ -48,7 +50,7 @@ public class RemoteDistributionPackageIm
 
 
     public RemoteDistributionPackageImporter(DistributionTransportSecretProvider distributionTransportSecretProvider,
-                                             String[] endpoints,
+                                             Map<String, String> endpointsMap,
                                              TransportEndpointStrategyType transportEndpointStrategyType) {
         this.distributionTransportSecretProvider = distributionTransportSecretProvider;
 
@@ -57,11 +59,13 @@ public class RemoteDistributionPackageIm
         }
 
 
-        List<DistributionTransport> transportHandlers = new ArrayList<DistributionTransport>();
+        Map<String, DistributionTransport> transportHandlers = new HashMap<String, DistributionTransport>();
 
-        for (String endpoint : endpoints) {
+        for (Map.Entry<String, String> entry : endpointsMap.entrySet()) {
+            String endpointKey = entry.getKey();
+            String endpoint = entry.getValue();
             if (endpoint != null && endpoint.length() > 0) {
-                transportHandlers.add(new SimpleHttpDistributionTransport(new DistributionEndpoint(endpoint), null, -1));
+                transportHandlers.put(endpointKey, new SimpleHttpDistributionTransport(new DistributionEndpoint(endpoint), null, -1));
             }
         }
         transportHandler = new MultipleEndpointDistributionTransport(transportHandlers,

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterFactory.java Mon Jan  5 10:40:33 2015
@@ -33,6 +33,7 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.component.impl.DistributionComponentManager;
 import org.apache.sling.distribution.component.impl.DistributionComponentUtils;
+import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageImportException;
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
@@ -89,7 +90,7 @@ public class RemoteDistributionPackageIm
     @Activate
     protected void activate(Map<String, Object> config) {
 
-        String[] endpoints = PropertiesUtil.toStringArray(config.get(ENDPOINTS), new String[0]);
+        Map<String, String> endpoints = SettingsUtils.toUriMap(config.get(ENDPOINTS));
         String endpointStrategyName = PropertiesUtil.toString(config.get(ENDPOINTS_STRATEGY), "One");
 
         TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java Mon Jan  5 10:40:33 2015
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl;
+
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.SharedDistributionPackage;
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueException;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The default strategy for delivering packages to queues. Each package can be dispatched to multiple queues.
+ */
+public class MultipleQueueDispatchingStrategy implements DistributionQueueDispatchingStrategy {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String[] queueNames;
+
+    public MultipleQueueDispatchingStrategy(String[] queueNames) {
+
+        this.queueNames = queueNames;
+    }
+
+    public Iterable<DistributionQueueItemStatus> add(@Nonnull DistributionPackage distributionPackage, @Nonnull DistributionQueueProvider queueProvider) throws DistributionQueueException {
+        DistributionQueueItem queueItem = getItem(distributionPackage);
+        List<DistributionQueueItemStatus> result = new ArrayList<DistributionQueueItemStatus>();
+
+        for (String queueName: queueNames) {
+            DistributionQueue queue = queueProvider.getQueue(queueName);
+            if (distributionPackage instanceof SharedDistributionPackage) {
+                ((SharedDistributionPackage) distributionPackage).acquire(queueName);
+            }
+            DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemStatus.ItemState.ERROR, queue.getName());
+            if (queue.add(queueItem)) {
+                 status = queue.getStatus(queueItem);
+            }
+            else {
+                if (distributionPackage instanceof SharedDistributionPackage) {
+                    ((SharedDistributionPackage) distributionPackage).release(queueName);
+                }
+            }
+
+            result.add(status);
+        }
+
+        return result;
+
+    }
+
+    @Nonnull
+    public List<String> getQueueNames() {
+
+        return Arrays.asList(queueNames);
+    }
+
+
+    private DistributionQueueItem getItem(DistributionPackage distributionPackage) {
+        DistributionQueueItem distributionQueueItem = new DistributionQueueItem(distributionPackage.getId(),
+                distributionPackage.getType(),
+                distributionPackage.getInfo());
+
+        return distributionQueueItem;
+    }
+
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SingleQueueDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SingleQueueDispatchingStrategy.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SingleQueueDispatchingStrategy.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SingleQueueDispatchingStrategy.java Mon Jan  5 10:40:33 2015
@@ -35,32 +35,10 @@ import org.slf4j.LoggerFactory;
  * The default strategy for delivering packages to queues. Each agent just manages a single queue,
  * no failure / stuck handling where each package is put regardless of anything.
  */
-public class SingleQueueDispatchingStrategy implements DistributionQueueDispatchingStrategy {
+public class SingleQueueDispatchingStrategy extends MultipleQueueDispatchingStrategy {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    public Iterable<DistributionQueueItemStatus> add(@Nonnull DistributionPackage distributionPackage, @Nonnull DistributionQueueProvider queueProvider) throws DistributionQueueException {
-        DistributionQueueItem queueItem = getItem(distributionPackage);
-        DistributionQueue queue = queueProvider.getQueue(DEFAULT_QUEUE_NAME);
-        if (queue.add(queueItem)) {
-            return Arrays.asList(queue.getStatus(queueItem));
-        } else {
-            return Arrays.asList(new DistributionQueueItemStatus(DistributionQueueItemStatus.ItemState.ERROR, queue.getName()));
-        }
-    }
-
-    @Nonnull
-    public List<String> getQueueNames() {
-        return Arrays.asList(DEFAULT_QUEUE_NAME);
-    }
-
-
-    private DistributionQueueItem getItem(DistributionPackage distributionPackage) {
-        DistributionQueueItem distributionQueueItem = new DistributionQueueItem(distributionPackage.getId(),
-                distributionPackage.getType(),
-                distributionPackage.getInfo());
-
-        return distributionQueueItem;
+    public SingleQueueDispatchingStrategy() {
+        super(new String[] { DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME });
     }
 
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/MultipleEndpointDistributionTransport.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/MultipleEndpointDistributionTransport.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/MultipleEndpointDistributionTransport.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/MultipleEndpointDistributionTransport.java Mon Jan  5 10:40:33 2015
@@ -20,14 +20,20 @@ package org.apache.sling.distribution.tr
 
 import javax.annotation.Nonnull;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.transport.core.DistributionTransport;
 import org.apache.sling.distribution.transport.core.DistributionTransportException;
 import org.apache.sling.distribution.transport.DistributionTransportSecret;
+import org.apache.sling.distribution.trigger.impl.PersistingJcrEventDistributionTrigger;
 
 /**
  * {@link org.apache.sling.distribution.transport.core.DistributionTransport} supporting delivery / retrieval from multiple
@@ -35,58 +41,93 @@ import org.apache.sling.distribution.tra
  */
 public class MultipleEndpointDistributionTransport implements DistributionTransport {
 
-    private final List<DistributionTransport> transportHelpers;
+    private final Map<String, DistributionTransport> transportHelpers;
     private final TransportEndpointStrategyType endpointStrategyType;
-    private int lastSuccessfulEndpointId = 0;
 
-    public MultipleEndpointDistributionTransport(List<DistributionTransport> transportHelpers,
+
+    public MultipleEndpointDistributionTransport(Map<String, DistributionTransport> transportHelpers,
                                                  TransportEndpointStrategyType endpointStrategyType) {
-        this.transportHelpers = transportHelpers;
+        this.transportHelpers = new TreeMap<String, DistributionTransport>();
+        this.transportHelpers.putAll(transportHelpers);
         this.endpointStrategyType = endpointStrategyType;
     }
 
-    private List<DistributionPackage> doTransport(ResourceResolver resourceResolver, DistributionRequest distributionRequest,
-                                                  DistributionPackage distributionPackage, DistributionTransportSecret secret) throws DistributionTransportException {
+    public MultipleEndpointDistributionTransport(List<DistributionTransport> transportHelpers,
+                                                 TransportEndpointStrategyType endpointStrategyType) {
+        this(SettingsUtils.toMap(transportHelpers, "endpoint"), endpointStrategyType);
+    }
+
+    public void deliverPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionPackage distributionPackage,
+                                                      @Nonnull DistributionTransportSecret secret) throws DistributionTransportException {
 
-        int offset = 0;
         if (endpointStrategyType.equals(TransportEndpointStrategyType.One)) {
-            offset = lastSuccessfulEndpointId;
+            DistributionPackageInfo info = distributionPackage.getInfo();
+            String queueName = info == null ? null : info.getQueue();
+
+
+            DistributionTransport distributionTransport = getDefaultTransport();
+            if (queueName != null) {
+                distributionTransport = transportHelpers.get(queueName);
+            }
+
+            if (distributionTransport != null) {
+                distributionTransport.deliverPackage(resourceResolver, distributionPackage, secret);
+            }
+
+
+        } else if  (endpointStrategyType.equals(TransportEndpointStrategyType.All)) {
+            for (DistributionTransport distributionTransport: transportHelpers.values()) {
+                distributionTransport.deliverPackage(resourceResolver, distributionPackage, secret);
+
+            }
+
         }
+    }
 
-        int length = transportHelpers.size();
+    @Nonnull
+    public List<DistributionPackage> retrievePackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest,
+                                                      @Nonnull DistributionTransportSecret secret) throws DistributionTransportException {
         List<DistributionPackage> result = new ArrayList<DistributionPackage>();
 
-        for (int i = 0; i < length; i++) {
-            int currentId = (offset + i) % length;
 
-            DistributionTransport transportHelper = transportHelpers.get(currentId);
-            if (distributionPackage != null) {
-                transportHelper.deliverPackage(resourceResolver, distributionPackage, secret);
-            } else if (distributionRequest != null) {
-                Iterable<DistributionPackage> distributionPackages = transportHelper.retrievePackages(resourceResolver, distributionRequest, secret);
-                for (DistributionPackage retrievedPackage : distributionPackages) {
+        if (endpointStrategyType.equals(TransportEndpointStrategyType.One)) {
+
+            DistributionTransport distributionTransport = getDefaultTransport();
+
+            if (distributionTransport != null) {
+                Iterable<DistributionPackage> retrievedPackages = distributionTransport.retrievePackages(resourceResolver, distributionRequest, secret);
+
+                for (DistributionPackage retrievedPackage : retrievedPackages) {
                     result.add(retrievedPackage);
                 }
             }
 
-            lastSuccessfulEndpointId = currentId;
-            if (endpointStrategyType.equals(TransportEndpointStrategyType.One))
-                break;
+
+        } else if  (endpointStrategyType.equals(TransportEndpointStrategyType.All)) {
+            for (DistributionTransport distributionTransport: transportHelpers.values()) {
+                Iterable<DistributionPackage> retrievedPackages = distributionTransport.retrievePackages(resourceResolver, distributionRequest, secret);
+
+                for (DistributionPackage retrievedPackage : retrievedPackages) {
+                    result.add(retrievedPackage);
+                }
+
+            }
         }
 
         return result;
     }
 
-    public void deliverPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionPackage distributionPackage,
-                                                      @Nonnull DistributionTransportSecret secret) throws DistributionTransportException {
-        doTransport(resourceResolver, null, distributionPackage, secret);
-    }
+    DistributionTransport getDefaultTransport() {
+        DistributionTransport[] handlers = transportHelpers.values().toArray(new DistributionTransport[0]);
 
-    @Nonnull
-    public List<DistributionPackage> retrievePackages(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest,
-                                                      @Nonnull DistributionTransportSecret secret) throws DistributionTransportException {
-        return doTransport(resourceResolver, distributionRequest, null, secret);
+        if (handlers != null && handlers.length > 0) {
+            return handlers[0];
+        }
+
+        return null;
     }
 
 
+
+
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterTest.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterTest.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/packaging/impl/importer/RemoteDistributionPackageImporterTest.java Mon Jan  5 10:40:33 2015
@@ -24,6 +24,9 @@ import org.apache.sling.distribution.tra
 import org.apache.sling.distribution.transport.impl.TransportEndpointStrategyType;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.mockito.Mockito.mock;
 
 /**
@@ -34,7 +37,7 @@ public class RemoteDistributionPackageIm
     @Test
     public void testDummyImport() throws Exception {
         DistributionTransportSecretProvider distributionTransportSecretProvider = mock(DistributionTransportSecretProvider.class);
-        String[] endpoints = new String[0];
+        Map<String, String> endpoints = new HashMap<String, String>();
         for (TransportEndpointStrategyType strategy : TransportEndpointStrategyType.values()) {
             RemoteDistributionPackageImporter remotedistributionPackageImporter = new RemoteDistributionPackageImporter(
                     distributionTransportSecretProvider, endpoints, strategy);

Modified: sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionIntegrationTestBase.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionIntegrationTestBase.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionIntegrationTestBase.java (original)
+++ sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionIntegrationTestBase.java Mon Jan  5 10:40:33 2015
@@ -31,6 +31,7 @@ import static org.apache.sling.distribut
 import static org.apache.sling.distribution.it.DistributionUtils.authorAgentConfigUrl;
 import static org.apache.sling.distribution.it.DistributionUtils.exporterUrl;
 import static org.apache.sling.distribution.it.DistributionUtils.importerUrl;
+import static org.apache.sling.distribution.it.DistributionUtils.setArrayProperties;
 
 /**
  * Integration test base class for distribution
@@ -59,20 +60,27 @@ public abstract class DistributionIntegr
             String remoteImporterUrl = publish.getServerBaseUrl() + importerUrl("default");
 
 
-            authorClient.setProperties(authorAgentConfigUrl("publish") + "/packageImporter",
-                    "endpoints", remoteImporterUrl);
+            authorClient.setProperties(authorAgentConfigUrl("publish"),
+                    "packageImporter.endpoints", remoteImporterUrl);
 
 
             Thread.sleep(3000);
 
             assertExists(authorClient, agentUrl("publish"));
 
+
+            assertExists(authorClient, authorAgentConfigUrl("publish-multiple"));
+            setArrayProperties(author, authorAgentConfigUrl("publish-multiple"),
+                    "packageImporter.endpoints", remoteImporterUrl, remoteImporterUrl + "badaddress");
+            assertExists(authorClient, agentUrl("publish-multiple"));
+
+
+
             assertExists(authorClient, authorAgentConfigUrl("publish-reverse"));
 
             String remoteExporterUrl = publish.getServerBaseUrl() + exporterUrl("reverse");
 
-            authorClient.setProperties(authorAgentConfigUrl("publish-reverse") + "/packageExporter",
-                    "endpoints", remoteExporterUrl);
+            authorClient.setProperties(authorAgentConfigUrl("publish-reverse"), "packageExporter.endpoints", remoteExporterUrl);
 
             Thread.sleep(3000);
             assertExists(authorClient, agentUrl("publish-reverse"));

Modified: sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java Mon Jan  5 10:40:33 2015
@@ -80,7 +80,16 @@ public class DistributionUtils {
         ).assertStatus(status).getContent();
     }
 
+    public static void setArrayProperties(SlingInstance slingInstance, String resource, String property, String... values) throws IOException {
+        List<String> parameters = new ArrayList<String>();
+        for (String value : values) {
+            parameters.add(property);
+            parameters.add(value);
+        }
 
+        assertPostResourceWithParameters(slingInstance, 200, resource, parameters.toArray(new String[0]));
+
+    }
 
     public static void assertResponseContains(SlingInstance slingInstance,
                                               String resource, String... parameters) throws IOException {

Added: sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/MultipleForwardDistributionTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/MultipleForwardDistributionTest.java?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/MultipleForwardDistributionTest.java (added)
+++ sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/MultipleForwardDistributionTest.java Mon Jan  5 10:40:33 2015
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.it;
+
+import org.apache.sling.distribution.DistributionRequestType;
+import org.junit.Test;
+
+import static org.apache.sling.distribution.it.DistributionUtils.assertExists;
+import static org.apache.sling.distribution.it.DistributionUtils.assertNotExists;
+import static org.apache.sling.distribution.it.DistributionUtils.createRandomNode;
+import static org.apache.sling.distribution.it.DistributionUtils.distribute;
+import static org.apache.sling.distribution.it.DistributionUtils.distributeDeep;
+
+/**
+ * Integration test for forward distribution
+ */
+public class MultipleForwardDistributionTest extends DistributionIntegrationTestBase {
+
+    @Test
+    public void testAddContent() throws Exception {
+        String nodePath = createRandomNode(authorClient, "/content/forward_add_" + System.nanoTime());
+        assertExists(authorClient, nodePath);
+        distribute(author, "publish-multiple", DistributionRequestType.ADD, nodePath);
+        assertExists(publishClient, nodePath);
+    }
+
+    @Test
+    public void testDeleteContent() throws Exception {
+        String nodePath = createRandomNode(publishClient, "/content/forward_del_" + System.nanoTime());
+        assertExists(publishClient, nodePath);
+        distribute(author, "publish-multiple", DistributionRequestType.DELETE, nodePath);
+        assertNotExists(publishClient, nodePath);
+    }
+
+}

Added: sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-multiple.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-multiple.json?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-multiple.json (added)
+++ sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-multiple.json Mon Jan  5 10:40:33 2015
@@ -0,0 +1,11 @@
+{
+    "type" : "forward",
+
+    "serviceName" : "distributionService",
+
+
+    "packageImporter.endpoints" : [
+        "endpoint1=http://localhost:4503/libs/sling/distribution/services/importers/default",
+        "endpoint2=http://localhost:4503/libs/sling/distribution/services/importers/default"
+    ]
+}
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-reverse.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-reverse.json?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-reverse.json (original)
+++ sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish-reverse.json Mon Jan  5 10:40:33 2015
@@ -1,18 +1,11 @@
 {
-    "type" : "simple",
+    "type" : "reverse",
 
     "serviceName" : "distributionService",
 
-    "packageExporter":  {
-        "type": "remote",
-        "endpoints" : [
+    "packageExporter.endpoints": [
             "http://localhost:4503/libs/sling/distribution/services/exporters/reverse"
-        ]
-    },
-
-    "packageImporter": {
-        "type" : "local"
-    },
+    ],
 
     "triggers" : {
         "scheduled" : {

Modified: sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish.json?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish.json (original)
+++ sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/agents/publish.json Mon Jan  5 10:40:33 2015
@@ -1,17 +1,10 @@
 {
-    "type" : "simple",
+    "type" : "forward",
 
     "serviceName" : "distributionService",
 
-    "packageExporter":  {
-        "type": "local"
-    },
 
-    "packageImporter": {
-        "type" : "remote",
-
-        "endpoints" : [
-            "http://localhost:4503/libs/sling/distribution/services/importers/default"
-        ]
-    }
+    "packageImporter.endpoints" : [
+        "http://localhost:4503/libs/sling/distribution/services/importers/default"
+    ]
 }
\ No newline at end of file

Added: sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/forward.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/forward.json?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/forward.json (added)
+++ sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/forward.json Mon Jan  5 10:40:33 2015
@@ -0,0 +1,31 @@
+{
+    "jcr:primaryType" : "nt:unstructured",
+
+    "kind" : "agent",
+
+    "isResource" : "true",
+
+    "requestAuthorizationStrategy" : {
+        "kind" : "requestAuthorization",
+        "type" : "service",
+        "name" : "privilegeRead"
+    },
+
+    "packageBuilder" : {
+        "kind" : "packageBuilder",
+
+        "type" : "service",
+        "name" : "vlt"
+    },
+
+    "transportSecretProvider" : {
+        "kind" : "transportSecretProvider",
+        "type" : "service",
+        "name" : "publishAdmin"
+    },
+
+    "triggers" : {
+      "kind" : "trigger",
+      "type" : "list"
+    }
+}
\ No newline at end of file

Added: sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/reverse.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/reverse.json?rev=1649499&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/reverse.json (added)
+++ sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.author/defaults/agents/reverse.json Mon Jan  5 10:40:33 2015
@@ -0,0 +1,31 @@
+{
+    "jcr:primaryType" : "nt:unstructured",
+
+    "kind" : "agent",
+
+    "isResource" : "true",
+
+    "requestAuthorizationStrategy" : {
+        "kind" : "requestAuthorization",
+        "type" : "service",
+        "name" : "privilegeRead"
+    },
+
+    "packageBuilder" : {
+        "kind" : "packageBuilder",
+
+        "type" : "service",
+        "name" : "vlt"
+    },
+
+    "transportSecretProvider" : {
+        "kind" : "transportSecretProvider",
+        "type" : "service",
+        "name" : "publishAdmin"
+    },
+
+    "triggers" : {
+      "kind" : "trigger",
+      "type" : "list"
+    }
+}
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.publish/agents/reverse.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.publish/agents/reverse.json?rev=1649499&r1=1649498&r2=1649499&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.publish/agents/reverse.json (original)
+++ sling/trunk/contrib/extensions/distribution/sample/src/main/resources/SLING-CONTENT/libs/sling/distribution/settings.publish/agents/reverse.json Mon Jan  5 10:40:33 2015
@@ -7,9 +7,5 @@
 
     "packageExporter":  {
         "type": "local"
-    },
-
-    "packageImporter": {
-        "type" : "local"
     }
 }
\ No newline at end of file