You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2014/10/02 11:47:25 UTC
svn commit: r1628917 - in /sling/trunk/contrib/extensions/replication:
core/src/main/java/org/apache/sling/replication/agent/impl/
core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/
core/src/main/java/org/apache/sling/replication/...
Author: tommaso
Date: Thu Oct 2 09:47:24 2014
New Revision: 1628917
URL: http://svn.apache.org/r1628917
Log:
SLING-3992 - added coordinating agent(factory), fixed some defaults (scheduling,remote polling)
Added:
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java (with props)
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java (with props)
sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json
Modified:
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java
sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java
Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java?rev=1628917&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java Thu Oct 2 09:47:24 2014
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.agent.impl;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.sling.replication.agent.AgentReplicationException;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.communication.ReplicationResponse;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.apache.sling.replication.packaging.ReplicationPackage;
+import org.apache.sling.replication.packaging.ReplicationPackageExporter;
+import org.apache.sling.replication.packaging.ReplicationPackageImporter;
+import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporter;
+import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporterFactory;
+import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporter;
+import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporterFactory;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.serialization.ReplicationPackageReadingException;
+import org.apache.sling.replication.trigger.ReplicationTrigger;
+import org.apache.sling.replication.trigger.ReplicationTriggerRequestHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.sling.replication.agent.ReplicationAgent} responsible for coordinating replication between two or more
+ * remote Sling instances.
+ * <p/>
+ * A coordinate agent is configured by specifying the remote importer endpoints to monitor/pull and the remote exporter
+ * endpoints to push content to (either to one of the instances or to all of them).
+ */
+public class CoordinatingReplicationAgent implements ReplicationAgent {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ReplicationQueueProvider queueProvider;
+
+ private final ReplicationPackageImporter replicationPackageImporter;
+ private final ReplicationPackageExporter replicationPackageExporter;
+
+ private final ReplicationQueueDistributionStrategy queueDistributionStrategy;
+
+ private final ReplicationEventFactory replicationEventFactory;
+
+ private final List<ReplicationTrigger> triggers;
+
+ private final String name;
+
+ private final boolean useAggregatePaths;
+
+ public CoordinatingReplicationAgent(String name,
+ boolean useAggregatePaths,
+ RemoteReplicationPackageImporter remoteReplicationPackageImporter,
+ RemoteReplicationPackageExporter remoteReplicationPackageExporter,
+ ReplicationQueueProvider queueProvider,
+ ReplicationQueueDistributionStrategy queueDistributionHandler,
+ ReplicationEventFactory replicationEventFactory,
+ List<ReplicationTrigger> triggers) {
+ this.name = name;
+ this.replicationPackageImporter = remoteReplicationPackageImporter;
+ this.replicationPackageExporter = remoteReplicationPackageExporter;
+ this.queueProvider = queueProvider;
+ this.queueDistributionStrategy = queueDistributionHandler;
+ this.useAggregatePaths = useAggregatePaths;
+ this.replicationEventFactory = replicationEventFactory;
+ this.triggers = triggers == null ? new ArrayList<ReplicationTrigger>() : triggers;
+ }
+
+ public ReplicationResponse execute(ReplicationRequest replicationRequest)
+ throws AgentReplicationException {
+ try {
+ return schedule(buildPackages(replicationRequest));
+ } catch (Exception e) {
+ log.error("Error executing replication request {}", replicationRequest, e);
+ throw new AgentReplicationException(e);
+ }
+ }
+
+ private List<ReplicationPackage> buildPackages(ReplicationRequest replicationRequest) throws ReplicationPackageBuildingException {
+
+ List<ReplicationPackage> replicationPackages = new ArrayList<ReplicationPackage>();
+
+ if (useAggregatePaths) {
+ List<ReplicationPackage> exportedPackages = replicationPackageExporter.exportPackage(replicationRequest);
+ replicationPackages.addAll(exportedPackages);
+ } else {
+ for (String path : replicationRequest.getPaths()) {
+ ReplicationRequest splitReplicationRequest = new ReplicationRequest(replicationRequest.getTime(),
+ replicationRequest.getAction(),
+ path);
+ List<ReplicationPackage> exportedPackages = replicationPackageExporter.exportPackage(splitReplicationRequest);
+ replicationPackages.addAll(exportedPackages);
+ }
+ }
+
+ return replicationPackages;
+ }
+
+ private ReplicationResponse schedule(List<ReplicationPackage> replicationPackages) {
+ // TODO : create a composite replication response otherwise only the last response will be returned
+ ReplicationResponse replicationResponse = new ReplicationResponse();
+
+ for (ReplicationPackage replicationPackage : replicationPackages) {
+ ReplicationResponse currentReplicationResponse = schedule(replicationPackage);
+
+ replicationResponse.setSuccessful(currentReplicationResponse.isSuccessful());
+ replicationResponse.setStatus(currentReplicationResponse.getStatus());
+ }
+
+ return replicationResponse;
+ }
+
+ private ReplicationResponse schedule(ReplicationPackage replicationPackage) {
+ ReplicationResponse replicationResponse = new ReplicationResponse();
+ log.info("scheduling replication of package {}", replicationPackage);
+
+ ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
+ replicationPackage.getPaths(),
+ replicationPackage.getAction(),
+ replicationPackage.getType());
+
+ // send the replication package to the queue distribution handler
+ try {
+ ReplicationQueueItemState state = queueDistributionStrategy.add(getName(), replicationQueueItem,
+ queueProvider);
+
+ Dictionary<Object, Object> properties = new Properties();
+ properties.put("replication.package.paths", replicationQueueItem.getPaths());
+ properties.put("replication.agent.name", name);
+ replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_QUEUED, properties);
+
+ if (state != null) {
+ replicationResponse.setStatus(state.getItemState().toString());
+ replicationResponse.setSuccessful(state.isSuccessful());
+ } else {
+ replicationResponse.setStatus(ReplicationQueueItemState.ItemState.ERROR.toString());
+ replicationResponse.setSuccessful(false);
+ }
+ } catch (Exception e) {
+ log.error("an error happened during queue processing", e);
+
+ replicationResponse.setSuccessful(false);
+ }
+
+ return replicationResponse;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public ReplicationQueue getQueue(String name) throws ReplicationQueueException {
+ ReplicationQueue queue;
+ if (name != null && name.length() > 0) {
+ queue = queueProvider.getQueue(getName(), name);
+ } else {
+ queue = queueProvider.getDefaultQueue(getName());
+ }
+ return queue;
+ }
+
+
+ public void enable() {
+ log.info("enabling agent");
+ // register triggers if any
+
+ for (int i = 0; i < triggers.size(); i++) {
+ ReplicationTrigger trigger = triggers.get(i);
+ String handlerId = getName() + "-" + i;
+ trigger.register(handlerId, new AgentBasedTriggerRequestHandler(this));
+ }
+
+ queueProvider.enableQueueProcessing(getName(), new PackageQueueProcessor());
+ }
+
+ public void disable() {
+ log.info("disabling agent");
+ for (int i = 0; i < triggers.size(); i++) {
+ ReplicationTrigger trigger = triggers.get(i);
+ String handlerId = getName() + "-" + i;
+ trigger.unregister(handlerId);
+ }
+
+ queueProvider.disableQueueProcessing(getName());
+ }
+
+ private boolean processTransportQueue(ReplicationQueueItem queueItem) {
+ boolean success = false;
+ log.debug("reading package with id {}", queueItem.getId());
+ try {
+ ReplicationPackage replicationPackage = replicationPackageExporter.exportPackageById(queueItem.getId());
+ if (replicationPackage != null) {
+ replicationPackageImporter.importPackage(replicationPackage);
+
+ Dictionary<Object, Object> properties = new Properties();
+ properties.put("replication.package.paths", replicationPackage.getPaths());
+ properties.put("replication.agent.name", name);
+ replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_REPLICATED, properties);
+
+ replicationPackage.delete();
+ success = true;
+ }
+ } catch (ReplicationPackageReadingException e) {
+ log.error("could not process transport queue", e);
+ }
+ return success;
+ }
+
+ class PackageQueueProcessor implements ReplicationQueueProcessor {
+ public boolean process(String queueName, ReplicationQueueItem packageInfo) {
+ log.info("running package queue processor");
+ return processTransportQueue(packageInfo);
+ }
+ }
+
+ public class AgentBasedTriggerRequestHandler implements ReplicationTriggerRequestHandler {
+ private final ReplicationAgent agent;
+
+ public AgentBasedTriggerRequestHandler(ReplicationAgent agent) {
+ this.agent = agent;
+ }
+
+ public void handle(ReplicationRequest request) {
+ try {
+ agent.execute(request);
+ } catch (AgentReplicationException e) {
+ log.error("Error executing handler", e);
+ }
+ }
+ }
+}
Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java?rev=1628917&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java (added)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java Thu Oct 2 09:47:24 2014
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.agent.impl;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.agent.ReplicationComponentProvider;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An OSGi service factory for {@link org.apache.sling.replication.agent.impl.CoordinatingReplicationAgent}s.
+ */
+@Component(metatype = true,
+ label = "Coordinating Replication Agents Factory",
+ description = "OSGi configuration based ReplicationAgent service factory",
+ name = CoordinatingReplicationAgentFactory.SERVICE_PID,
+ configurationFactory = true,
+ specVersion = "1.1",
+ policy = ConfigurationPolicy.REQUIRE
+)
+public class CoordinatingReplicationAgentFactory implements ReplicationComponentListener {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ static final String SERVICE_PID = "org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory";
+
+ @Property(boolValue = true, label = "Enabled")
+ private static final String ENABLED = "enabled";
+
+ @Property(label = "Name")
+ public static final String NAME = "name";
+
+ @Property(boolValue = true, label = "Replicate using aggregated paths")
+ public static final String USE_AGGREGATE_PATHS = "useAggregatePaths";
+
+ @Property(label = "Package Exporter", cardinality = 100)
+ public static final String PACKAGE_EXPORTER = "packageExporter";
+
+ @Property(label = "Package Importer", cardinality = 100)
+ public static final String PACKAGE_IMPORTER = "packageImporter";
+
+ @Property(label = "Queue Provider", cardinality = 100)
+ public static final String QUEUE_PROVIDER = "queueProvider";
+
+ @Property(label = "Queue Distribution Strategy", cardinality = 100)
+ public static final String QUEUE_DISTRIBUTION_STRATEGY = "queueDistributionStrategy";
+
+ @Reference
+ private SlingSettingsService settingsService;
+
+ @Reference
+ private ReplicationComponentProvider componentProvider;
+
+ private ServiceRegistration agentReg;
+ private ServiceRegistration listenerReg;
+
+ private BundleContext savedContext;
+ private Map<String, Object> savedConfig;
+
+ @Activate
+ public void activate(BundleContext context, Map<String, Object> config) throws Exception {
+ log.debug("activating agent with config {}", config);
+
+ savedContext = context;
+ savedConfig = config;
+
+ // inject configuration
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+
+ boolean enabled = PropertiesUtil.toBoolean(config.get(ENABLED), true);
+ String name = PropertiesUtil.toString(config.get(NAME), null);
+
+ if (enabled) {
+ props.put(ENABLED, true);
+ props.put(NAME, name);
+
+ if (listenerReg == null) {
+ listenerReg = context.registerService(ReplicationComponentListener.class.getName(), this, props);
+ }
+
+ if (agentReg == null) {
+ Map<String, Object> properties = new HashMap<String, Object>();
+ properties.putAll(config);
+
+ properties.put("type", "coordinating");
+ CoordinatingReplicationAgent agent = (CoordinatingReplicationAgent) componentProvider.createComponent(ReplicationAgent.class, properties);
+
+ log.debug("activated agent {}", agent != null ? agent.getName() : null);
+
+ if (agent != null) {
+ props.put(NAME, agent.getName());
+
+ // register agent service
+ agentReg = context.registerService(ReplicationAgent.class.getName(), agent, props);
+ agent.enable();
+ }
+ }
+ }
+ }
+
+ @Deactivate
+ private void deactivate(BundleContext context) {
+ log.debug("deactivating agent");
+ if (agentReg != null) {
+ ServiceReference reference = agentReg.getReference();
+ CoordinatingReplicationAgent replicationAgent = (CoordinatingReplicationAgent) context.getService(reference);
+ replicationAgent.disable();
+ agentReg.unregister();
+ agentReg = null;
+ }
+ if (listenerReg != null) {
+ listenerReg.unregister();
+ listenerReg = null;
+ }
+ }
+
+ private void refresh(boolean isBinding) {
+ try {
+ if (savedContext != null && savedConfig != null) {
+ if (isBinding && agentReg == null) {
+ activate(savedContext, savedConfig);
+ } else if (!isBinding && agentReg != null) {
+ deactivate(savedContext);
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("Cannot refresh agent", e);
+ }
+ }
+
+ public <ComponentType> void componentBind(ComponentType component, String componentName) {
+ refresh(true);
+ }
+
+ public <ComponentType> void componentUnbind(ComponentType component, String componentName) {
+ refresh(false);
+ }
+}
Propchange: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/CoordinatingReplicationAgentFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationComponentProvider.java Thu Oct 2 09:47:24 2014
@@ -2,6 +2,7 @@ package org.apache.sling.replication.age
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -25,8 +26,10 @@ import org.apache.sling.replication.even
import org.apache.sling.replication.packaging.ReplicationPackageExporter;
import org.apache.sling.replication.packaging.ReplicationPackageImporter;
import org.apache.sling.replication.packaging.impl.exporter.LocalReplicationPackageExporterFactory;
+import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporter;
import org.apache.sling.replication.packaging.impl.exporter.RemoteReplicationPackageExporterFactory;
import org.apache.sling.replication.packaging.impl.importer.LocalReplicationPackageImporterFactory;
+import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporter;
import org.apache.sling.replication.packaging.impl.importer.RemoteReplicationPackageImporterFactory;
import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
@@ -162,6 +165,51 @@ public class DefaultReplicationComponent
return new SimpleReplicationAgent(name, useAggregatePaths, isPassive,
packageImporter, packageExporter, queueProvider, queueDistributionStrategy, replicationEventFactory, triggers);
+ } else if ("coordinating".equals(factory)) {
+ if (log.isDebugEnabled()) {
+ log.debug("creating coordinating agent");
+ for (Map.Entry<String, Object> e : properties.entrySet()) {
+ Object value = e.getValue();
+ log.info(e.getKey() + " -> " + (value != null && value.getClass().isArray() ? Arrays.toString((Object[]) value) : value));
+ }
+ }
+
+ // build exporter
+ Map<String, Object> exporterProperties = extractMap("packageExporter", properties);
+ exporterProperties.put(COMPONENT_TYPE, "remote");
+ RemoteReplicationPackageExporter packageExporter = (RemoteReplicationPackageExporter) createExporter(exporterProperties, componentProvider);
+
+ // build importer
+ Map<String, Object> importerProperties = extractMap("packageImporter", properties);
+ importerProperties.put(COMPONENT_TYPE, "remote");
+ RemoteReplicationPackageImporter packageImporter = (RemoteReplicationPackageImporter) createImporter(importerProperties, componentProvider);
+
+ // build triggers
+ List<ReplicationTrigger> triggers = new ArrayList<ReplicationTrigger>(1);
+ triggers.add(new ScheduledReplicationTrigger(Collections.<String, Object>emptyMap(), scheduler));
+
+ // TODO : eventually enable remote event triggers automatically
+// String[] exporterEndpoints = (String[]) exporterProperties.get("endpoints");
+// for (String exporterEndpoint : exporterEndpoints) {
+// }
+
+ Map<String, Object> queueDistributionStrategyProperties = extractMap("queueDistributionStrategy", properties);
+ ReplicationQueueDistributionStrategy queueDistributionStrategy = createDistributionStrategy(queueDistributionStrategyProperties, componentProvider);
+
+ Map<String, Object> queueProviderProperties = extractMap("queueProvider", properties);
+ ReplicationQueueProvider queueProvider = createQueueProvider(queueProviderProperties, componentProvider);
+
+ String name = PropertiesUtil.toString(properties.get(CoordinatingReplicationAgentFactory.NAME), String.valueOf(new Random().nextInt(1000)));
+
+ boolean useAggregatePaths = PropertiesUtil.toBoolean(properties.get(CompactSimpleReplicationAgentFactory.USE_AGGREGATE_PATHS), true);
+
+ // check configuration is valid
+ if (name == null || packageExporter == null || packageImporter == null || queueProvider == null || queueDistributionStrategy == null) {
+ log.error("could not create the coordinate agent with following bindings {}", Arrays.toString(new Object[]{name, packageExporter, packageImporter, queueProvider, queueDistributionStrategy}));
+ } else {
+ return new CoordinatingReplicationAgent(name, useAggregatePaths,
+ packageImporter, packageExporter, queueProvider, queueDistributionStrategy, replicationEventFactory, triggers);
+ }
}
return null;
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Thu Oct 2 09:47:24 2014
@@ -93,8 +93,8 @@ public class SimpleReplicationAgent impl
throws AgentReplicationException {
try {
return schedule(buildPackages(replicationRequest));
- } catch (ReplicationPackageBuildingException e) {
- log.error("Error building packages", e);
+ } catch (Exception e) {
+ log.error("Error executing replication request {}", replicationRequest, e);
throw new AgentReplicationException(e);
}
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/packaging/impl/exporter/RemoteReplicationPackageExporterFactory.java Thu Oct 2 09:47:24 2014
@@ -98,7 +98,7 @@ public class RemoteReplicationPackageExp
String[] endpoints = PropertiesUtil.toStringArray(config.get(ReplicationTransportConstants.ENDPOINTS), new String[0]);
- int pollItems = PropertiesUtil.toInteger(config.get(POLL_ITEMS), 1);
+ int pollItems = PropertiesUtil.toInteger(config.get(POLL_ITEMS), Integer.MAX_VALUE);
String endpointStrategyName = PropertiesUtil.toString(config.get(ReplicationTransportConstants.ENDPOINT_STRATEGY), "One");
TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);
Modified: sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java?rev=1628917&r1=1628916&r2=1628917&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java (original)
+++ sling/trunk/contrib/extensions/replication/core/src/main/java/org/apache/sling/replication/trigger/impl/ScheduledReplicationTrigger.java Thu Oct 2 09:47:24 2014
@@ -51,14 +51,13 @@ public class ScheduledReplicationTrigger
private final Scheduler scheduler;
public ScheduledReplicationTrigger(Map<String, Object> config, Scheduler scheduler) {
- this(ReplicationActionType.fromName(PropertiesUtil.toString(config.get(ACTION), null)),
- PropertiesUtil.toString(config.get(PATH), null),
+ this(ReplicationActionType.fromName(PropertiesUtil.toString(config.get(ACTION), ReplicationActionType.POLL.name())),
+ PropertiesUtil.toString(config.get(PATH), "/"),
PropertiesUtil.toInteger(config.get(SECONDS), 30),
scheduler);
}
public ScheduledReplicationTrigger(ReplicationActionType replicationAction, String path, int secondsInterval, Scheduler scheduler) {
-
this.replicationAction = replicationAction;
this.path = path;
this.secondsInterval = secondsInterval;
Added: sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json?rev=1628917&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json (added)
+++ sling/trunk/contrib/extensions/replication/sample/src/main/resources/SLING-CONTENT/libs/sling/replication/install.author/org.apache.sling.replication.agent.impl.CoordinatingReplicationAgentFactory-pubsync.json Thu Oct 2 09:47:24 2014
@@ -0,0 +1,42 @@
+{
+ "jcr:primaryType": "sling:OsgiConfig",
+ "name": "pubsync",
+ "type": "coordinating",
+ "enabled" : false,
+
+ "packageExporter": [
+ "authenticationProvider/type=service",
+ "authenticationProvider/name=publishAdmin",
+
+ "packageBuilder/type=vlt",
+ "packageBuilder/servicename=replicationService",
+
+ "endpoints[0]=http://localhost:4503/libs/sling/replication/services/exporters/reverse",
+ "endpoints[1]=http://localhost:4504/libs/sling/replication/services/exporters/reverse",
+ "endpoints[2]=http://localhost:4505/libs/sling/replication/services/exporters/reverse",
+ "endpoints.strategy=All"
+ ],
+
+ "packageImporter": [
+ "authenticationProvider/type=service",
+ "authenticationProvider/name=publishAdmin",
+
+ "packageBuilder/type=vlt",
+ "packageBuilder/servicename=replicationService",
+
+ "endpoints[0]=http://localhost:4503/libs/sling/replication/services/importers/default",
+ "endpoints[1]=http://localhost:4504/libs/sling/replication/services/importers/default",
+ "endpoints[2]=http://localhost:4505/libs/sling/replication/services/importers/default",
+ "endpoints.strategy=All"
+ ],
+
+ "queueProvider": [
+ "type=service",
+ "name=sjh"
+ ],
+
+ "queueDistributionStrategy": [
+ "type=service",
+ "name=error"
+ ]
+}
\ No newline at end of file