You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2014/10/02 11:47:25 UTC

svn commit: r1628917 - in /sling/trunk/contrib/extensions/replication: core/src/main/java/org/apache/sling/replication/agent/impl/ core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/ core/src/main/java/org/apache/sling/replication/...

Author: tommaso
Date: Thu Oct  2 09:47:24 2014
New Revision: 1628917

URL: http://svn.apache.org/r1628917
Log:
SLING-3992 - added coordinating agent(factory), fixed some defaults (scheduling,remote polling)

Added:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java   (with props)
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java   (with props)
    sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json
Modified:
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java
    sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java?rev=1628917&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java Thu Oct  2 09:47:24 2014
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.agent.impl;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.sling.replication.agent.AgentReplicationException;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.communication.ReplicationResponse;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.apache.sling.replication.packaging.ReplicationPackage;
+import org.apache.sling.replication.packaging.ReplicationPackageExporter;
+import org.apache.sling.replication.packaging.ReplicationPackageImporter;
+import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporter;
+import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporterFactory;
+import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporter;
+import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporterFactory;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.serialization.ReplicationPackageReadingException;
+import org.apache.sling.replication.trigger.ReplicationTrigger;
+import org.apache.sling.replication.trigger.ReplicationTriggerRequestHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.sling.replication.agent.ReplicationAgent} responsible for coordinating replication between two or more
+ * remote Sling instances.
+ * <p/>
+ * A coordinate agent is configured by specifying the remote importer endpoints to monitor/pull and the remote exporter
+ * endpoints to push content to (either to one of the instances or to all of them).
+ */
+public class CoordinatingReplicationAgent implements ReplicationAgent {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final ReplicationQueueProvider queueProvider;
+
+    private final ReplicationPackageImporter replicationPackageImporter;
+    private final ReplicationPackageExporter replicationPackageExporter;
+
+    private final ReplicationQueueDistributionStrategy queueDistributionStrategy;
+
+    private final ReplicationEventFactory replicationEventFactory;
+
+    private final List<ReplicationTrigger> triggers;
+
+    private final String name;
+
+    private final boolean useAggregatePaths;
+
+    public CoordinatingReplicationAgent(String name,
+                                        boolean useAggregatePaths,
+                                        RemoteReplicationPackageImporter remoteReplicationPackageImporter,
+                                        RemoteReplicationPackageExporter remoteReplicationPackageExporter,
+                                        ReplicationQueueProvider queueProvider,
+                                        ReplicationQueueDistributionStrategy queueDistributionHandler,
+                                        ReplicationEventFactory replicationEventFactory,
+                                        List<ReplicationTrigger> triggers) {
+        this.name = name;
+        this.replicationPackageImporter = remoteReplicationPackageImporter;
+        this.replicationPackageExporter = remoteReplicationPackageExporter;
+        this.queueProvider = queueProvider;
+        this.queueDistributionStrategy = queueDistributionHandler;
+        this.useAggregatePaths = useAggregatePaths;
+        this.replicationEventFactory = replicationEventFactory;
+        this.triggers = triggers == null ? new ArrayList<ReplicationTrigger>() : triggers;
+    }
+
+    public ReplicationResponse execute(ReplicationRequest replicationRequest)
+            throws AgentReplicationException {
+        try {
+            return schedule(buildPackages(replicationRequest));
+        } catch (Exception e) {
+            log.error("Error executing replication request {}", replicationRequest, e);
+            throw new AgentReplicationException(e);
+        }
+    }
+
+    private List<ReplicationPackage> buildPackages(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException {
+
+        List<ReplicationPackage> replicationPackages = new ArrayList<ReplicationPackage>();
+
+        if (useAggregatePaths) {
+            List<ReplicationPackage> exportedPackages = replicationPackageExporter.exportPackage(replicationRequest);
+            replicationPackages.addAll(exportedPackages);
+        } else {
+            for (String path : replicationRequest.getPaths()) {
+                ReplicationRequest splitReplicationRequest = new ReplicationRequest(replicationRequest.getTime(),
+                        replicationRequest.getAction(),
+                        path);
+                List<ReplicationPackage> exportedPackages = replicationPackageExporter.exportPackage(splitReplicationRequest);
+                replicationPackages.addAll(exportedPackages);
+            }
+        }
+
+        return replicationPackages;
+    }
+
+    private ReplicationResponse schedule(List<ReplicationPackage> replicationPackages) {
+        // TODO : create a composite replication response otherwise only the last response will be returned
+        ReplicationResponse replicationResponse = new ReplicationResponse();
+
+        for (ReplicationPackage replicationPackage : replicationPackages) {
+            ReplicationResponse currentReplicationResponse = schedule(replicationPackage);
+
+            replicationResponse.setSuccessful(currentReplicationResponse.isSuccessful());
+            replicationResponse.setStatus(currentReplicationResponse.getStatus());
+        }
+
+        return replicationResponse;
+    }
+
+    private ReplicationResponse schedule(ReplicationPackage replicationPackage) {
+        ReplicationResponse replicationResponse = new ReplicationResponse();
+        log.info("scheduling replication of package {}", replicationPackage);
+
+        ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
+                replicationPackage.getPaths(),
+                replicationPackage.getAction(),
+                replicationPackage.getType());
+
+        // send the replication package to the queue distribution handler
+        try {
+            ReplicationQueueItemState state = queueDistributionStrategy.add(getName(), replicationQueueItem,
+                    queueProvider);
+
+            Dictionary<Object, Object> properties = new Properties();
+            properties.put("replication.package.paths", replicationQueueItem.getPaths());
+            properties.put("replication.agent.name", name);
+            replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_QUEUED, properties);
+
+            if (state != null) {
+                replicationResponse.setStatus(state.getItemState().toString());
+                replicationResponse.setSuccessful(state.isSuccessful());
+            } else {
+                replicationResponse.setStatus(ReplicationQueueItemState.ItemState.ERROR.toString());
+                replicationResponse.setSuccessful(false);
+            }
+        } catch (Exception e) {
+            log.error("an error happened during queue processing", e);
+
+            replicationResponse.setSuccessful(false);
+        }
+
+        return replicationResponse;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public ReplicationQueue getQueue(String name) throws ReplicationQueueException {
+        ReplicationQueue queue;
+        if (name != null && name.length() > 0) {
+            queue = queueProvider.getQueue(getName(), name);
+        } else {
+            queue = queueProvider.getDefaultQueue(getName());
+        }
+        return queue;
+    }
+
+
+    public void enable() {
+        log.info("enabling agent");
+        // register triggers if any
+
+        for (int i = 0; i < triggers.size(); i++) {
+            ReplicationTrigger trigger = triggers.get(i);
+            String handlerId = getName() + "-" + i;
+            trigger.register(handlerId, new AgentBasedTriggerRequestHandler(this));
+        }
+
+        queueProvider.enableQueueProcessing(getName(), new PackageQueueProcessor());
+    }
+
+    public void disable() {
+        log.info("disabling agent");
+        for (int i = 0; i < triggers.size(); i++) {
+            ReplicationTrigger trigger = triggers.get(i);
+            String handlerId = getName() + "-" + i;
+            trigger.unregister(handlerId);
+        }
+
+        queueProvider.disableQueueProcessing(getName());
+    }
+
+    private boolean processTransportQueue(ReplicationQueueItem queueItem) {
+        boolean success = false;
+        log.debug("reading package with id {}", queueItem.getId());
+        try {
+            ReplicationPackage replicationPackage = replicationPackageExporter.exportPackageById(queueItem.getId());
+            if (replicationPackage != null) {
+                replicationPackageImporter.importPackage(replicationPackage);
+
+                Dictionary<Object, Object> properties = new Properties();
+                properties.put("replication.package.paths", replicationPackage.getPaths());
+                properties.put("replication.agent.name", name);
+                replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_REPLICATED, properties);
+
+                replicationPackage.delete();
+                success = true;
+            }
+        } catch (ReplicationPackageReadingException e) {
+            log.error("could not process transport queue", e);
+        }
+        return success;
+    }
+
+    class PackageQueueProcessor implements ReplicationQueueProcessor {
+        public boolean process(String queueName, ReplicationQueueItem packageInfo) {
+            log.info("running package queue processor");
+            return processTransportQueue(packageInfo);
+        }
+    }
+
+    public class AgentBasedTriggerRequestHandler implements ReplicationTriggerRequestHandler {
+        private final ReplicationAgent agent;
+
+        public AgentBasedTriggerRequestHandler(ReplicationAgent agent) {
+            this.agent = agent;
+        }
+
+        public void handle(ReplicationRequest request) {
+            try {
+                agent.execute(request);
+            } catch (AgentReplicationException e) {
+                log.error("Error executing handler", e);
+            }
+        }
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java?rev=1628917&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java Thu Oct  2 09:47:24 2014
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.agent.impl;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.agent.ReplicationComponentProvider;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An OSGi service factory for {@link org.apache.sling.replication.agent.impl.CoordinatingReplicationAgent}s.
+ */
+@Component(metatype = true,
+        label = "Coordinating Replication Agents Factory",
+        description = "OSGi configuration based ReplicationAgent service factory",
+        name = CoordinatingReplicationAgentFactory.SERVICE_PID,
+        configurationFactory = true,
+        specVersion = "1.1",
+        policy = ConfigurationPolicy.REQUIRE
+)
+public class CoordinatingReplicationAgentFactory implements ReplicationComponentListener {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    static final String SERVICE_PID = "org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory";
+
+    @Property(boolValue = true, label = "Enabled")
+    private static final String ENABLED = "enabled";
+
+    @Property(label = "Name")
+    public static final String NAME = "name";
+
+    @Property(boolValue = true, label = "Replicate using aggregated paths")
+    public static final String USE_AGGREGATE_PATHS = "useAggregatePaths";
+
+    @Property(label = "Package Exporter", cardinality = 100)
+    public static final String PACKAGE_EXPORTER = "packageExporter";
+
+    @Property(label = "Package Importer", cardinality = 100)
+    public static final String PACKAGE_IMPORTER = "packageImporter";
+
+    @Property(label = "Queue Provider", cardinality = 100)
+    public static final String QUEUE_PROVIDER = "queueProvider";
+
+    @Property(label = "Queue Distribution Strategy", cardinality = 100)
+    public static final String QUEUE_DISTRIBUTION_STRATEGY = "queueDistributionStrategy";
+
+    @Reference
+    private SlingSettingsService settingsService;
+
+    @Reference
+    private ReplicationComponentProvider componentProvider;
+
+    private ServiceRegistration agentReg;
+    private ServiceRegistration listenerReg;
+
+    private BundleContext savedContext;
+    private Map<String, Object> savedConfig;
+
+    @Activate
+    public void activate(BundleContext context, Map<String, Object> config) throws Exception {
+        log.debug("activating agent with config {}", config);
+
+        savedContext = context;
+        savedConfig = config;
+
+        // inject configuration
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+
+        boolean enabled = PropertiesUtil.toBoolean(config.get(ENABLED), true);
+        String name = PropertiesUtil.toString(config.get(NAME), null);
+
+        if (enabled) {
+            props.put(ENABLED, true);
+            props.put(NAME, name);
+
+            if (listenerReg == null) {
+                listenerReg = context.registerService(ReplicationComponentListener.class.getName(), this, props);
+            }
+
+            if (agentReg == null) {
+                Map<String, Object> properties = new HashMap<String, Object>();
+                properties.putAll(config);
+
+                properties.put("type", "coordinating");
+                CoordinatingReplicationAgent agent = (CoordinatingReplicationAgent) componentProvider.createComponent(ReplicationAgent.class, properties);
+
+                log.debug("activated agent {}", agent != null ? agent.getName() : null);
+
+                if (agent != null) {
+                    props.put(NAME, agent.getName());
+
+                    // register agent service
+                    agentReg = context.registerService(ReplicationAgent.class.getName(), agent, props);
+                    agent.enable();
+                }
+            }
+        }
+    }
+
+    @Deactivate
+    private void deactivate(BundleContext context) {
+        log.debug("deactivating agent");
+        if (agentReg != null) {
+            ServiceReference reference = agentReg.getReference();
+            CoordinatingReplicationAgent replicationAgent = (CoordinatingReplicationAgent) context.getService(reference);
+            replicationAgent.disable();
+            agentReg.unregister();
+            agentReg = null;
+        }
+        if (listenerReg != null) {
+            listenerReg.unregister();
+            listenerReg = null;
+        }
+    }
+
+    private void refresh(boolean isBinding) {
+        try {
+            if (savedContext != null && savedConfig != null) {
+                if (isBinding && agentReg == null) {
+                    activate(savedContext, savedConfig);
+                } else if (!isBinding && agentReg != null) {
+                    deactivate(savedContext);
+                }
+            }
+
+        } catch (Exception e) {
+            log.error("Cannot refresh agent", e);
+        }
+    }
+
+    public <ComponentType> void componentBind(ComponentType component, String componentName) {
+        refresh(true);
+    }
+
+    public <ComponentType> void componentUnbind(ComponentType component, String componentName) {
+        refresh(false);
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java Thu Oct  2 09:47:24 2014
@@ -2,6 +2,7 @@ package org.apache.sling.replication.age
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -25,8 +26,10 @@ import org.apache.sling.replication.even
 import org.apache.sling.replication.packaging.ReplicationPackageExporter;
 import org.apache.sling.replication.packaging.ReplicationPackageImporter;
 import org.apache.sling.replication.packaging.impl.exporter.LocalReplicationPackageExporterFactory;
+import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporter;
 import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporterFactory;
 import org.apache.sling.replication.packaging.impl.importer.LocalReplicationPackageImporterFactory;
+import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporter;
 import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporterFactory;
 import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
@@ -162,6 +165,51 @@ public class DefaultReplicationComponent
             return new SimpleReplicationAgent(name, useAggregatePaths, isPassive,
                     packageImporter, packageExporter, queueProvider, queueDistributionStrategy, replicationEventFactory, triggers);
 
+        } else if ("coordinating".equals(factory)) {
+            if (log.isDebugEnabled()) {
+                log.debug("creating coordinating agent");
+                for (Map.Entry<String, Object> e : properties.entrySet()) {
+                    Object value = e.getValue();
+                    log.info(e.getKey() + " -> " + (value != null && value.getClass().isArray() ? Arrays.toString((Object[]) value) : value));
+                }
+            }
+
+            // build exporter
+            Map<String, Object> exporterProperties = extractMap("packageExporter", properties);
+            exporterProperties.put(COMPONENT_TYPE, "remote");
+            RemoteReplicationPackageExporter packageExporter = (RemoteReplicationPackageExporter) createExporter(exporterProperties, componentProvider);
+
+            // build importer
+            Map<String, Object> importerProperties = extractMap("packageImporter", properties);
+            importerProperties.put(COMPONENT_TYPE, "remote");
+            RemoteReplicationPackageImporter packageImporter = (RemoteReplicationPackageImporter) createImporter(importerProperties, componentProvider);
+
+            // build triggers
+            List<ReplicationTrigger> triggers = new ArrayList<ReplicationTrigger>(1);
+            triggers.add(new ScheduledReplicationTrigger(Collections.<String, Object>emptyMap(), scheduler));
+
+            // TODO : eventually enable remote event triggers automatically
+//            String[] exporterEndpoints = (String[]) exporterProperties.get("endpoints");
+//            for (String exporterEndpoint : exporterEndpoints) {
+//            }
+
+            Map<String, Object> queueDistributionStrategyProperties = extractMap("queueDistributionStrategy", properties);
+            ReplicationQueueDistributionStrategy queueDistributionStrategy = createDistributionStrategy(queueDistributionStrategyProperties, componentProvider);
+
+            Map<String, Object> queueProviderProperties = extractMap("queueProvider", properties);
+            ReplicationQueueProvider queueProvider = createQueueProvider(queueProviderProperties, componentProvider);
+
+            String name = PropertiesUtil.toString(properties.get(CoordinatingReplicationAgentFactory.NAME), String.valueOf(new Random().nextInt(1000)));
+
+            boolean useAggregatePaths = PropertiesUtil.toBoolean(properties.get(CompactSimpleReplicationAgentFactory.USE_AGGREGATE_PATHS), true);
+
+            // check configuration is valid
+            if (name == null || packageExporter == null || packageImporter == null || queueProvider == null || queueDistributionStrategy == null) {
+                log.error("could not create the coordinate agent with following bindings {}", Arrays.toString(new Object[]{name, packageExporter, packageImporter, queueProvider, queueDistributionStrategy}));
+            } else {
+                return new CoordinatingReplicationAgent(name, useAggregatePaths,
+                        packageImporter, packageExporter, queueProvider, queueDistributionStrategy, replicationEventFactory, triggers);
+            }
         }
 
         return null;

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Thu Oct  2 09:47:24 2014
@@ -93,8 +93,8 @@ public class SimpleReplicationAgent impl
             throws AgentReplicationException {
         try {
             return schedule(buildPackages(replicationRequest));
-        } catch (ReplicationPackageBuildingException e) {
-            log.error("Error building packages", e);
+        } catch (Exception e) {
+            log.error("Error executing replication request {}", replicationRequest, e);
             throw new AgentReplicationException(e);
         }
 

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java Thu Oct  2 09:47:24 2014
@@ -98,7 +98,7 @@ public class RemoteReplicationPackageExp
 
         String[] endpoints = PropertiesUtil.toStringArray(config.get(ReplicationTransportConstants.ENDPOINTS), new String[0]);
 
-        int pollItems = PropertiesUtil.toInteger(config.get(POLL_ITEMS), 1);
+        int pollItems = PropertiesUtil.toInteger(config.get(POLL_ITEMS), Integer.MAX_VALUE);
 
         String endpointStrategyName = PropertiesUtil.toString(config.get(ReplicationTransportConstants.ENDPOINT_STRATEGY), "One");
         TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);

Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java Thu Oct  2 09:47:24 2014
@@ -51,14 +51,13 @@ public class ScheduledReplicationTrigger
     private final Scheduler scheduler;
 
     public ScheduledReplicationTrigger(Map<String, Object> config, Scheduler scheduler) {
-        this(ReplicationActionType.fromName(PropertiesUtil.toString(config.get(ACTION), null)),
-                PropertiesUtil.toString(config.get(PATH), null),
+        this(ReplicationActionType.fromName(PropertiesUtil.toString(config.get(ACTION), ReplicationActionType.POLL.name())),
+                PropertiesUtil.toString(config.get(PATH), "/"),
                 PropertiesUtil.toInteger(config.get(SECONDS), 30),
                 scheduler);
     }
 
     public ScheduledReplicationTrigger(ReplicationActionType replicationAction, String path, int secondsInterval, Scheduler scheduler) {
-
         this.replicationAction = replicationAction;
         this.path = path;
         this.secondsInterval = secondsInterval;

Added: sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json?rev=1628917&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json (added)
+++ sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json Thu Oct  2 09:47:24 2014
@@ -0,0 +1,42 @@
+{
+    "jcr:primaryType": "sling:OsgiConfig",
+    "name": "pubsync",
+    "type": "coordinating",
+    "enabled" : false,
+
+    "packageExporter": [
+        "authenticationProvider/type=service",
+        "authenticationProvider/name=publishAdmin",
+
+        "packageBuilder/type=vlt",
+        "packageBuilder/servicename=replicationService",
+
+        "endpoints[0]=http://localhost:4503/libs/sling/replication/services/exporters/reverse",
+        "endpoints[1]=http://localhost:4504/libs/sling/replication/services/exporters/reverse",
+        "endpoints[2]=http://localhost:4505/libs/sling/replication/services/exporters/reverse",
+        "endpoints.strategy=All"
+    ],
+
+    "packageImporter": [
+        "authenticationProvider/type=service",
+        "authenticationProvider/name=publishAdmin",
+
+        "packageBuilder/type=vlt",
+        "packageBuilder/servicename=replicationService",
+
+        "endpoints[0]=http://localhost:4503/libs/sling/replication/services/importers/default",
+        "endpoints[1]=http://localhost:4504/libs/sling/replication/services/importers/default",
+        "endpoints[2]=http://localhost:4505/libs/sling/replication/services/importers/default",
+        "endpoints.strategy=All"
+    ],
+
+    "queueProvider": [
+        "type=service",
+        "name=sjh"
+    ],
+
+    "queueDistributionStrategy": [
+        "type=service",
+        "name=error"
+    ]
+}
\ No newline at end of file