You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2014/01/20 18:23:52 UTC
svn commit: r1559782 [1/2] - in
/sling/trunk/contrib/extensions/replication/src:
main/java/org/apache/sling/replication/agent/
main/java/org/apache/sling/replication/agent/impl/
main/java/org/apache/sling/replication/monitor/ main/java/org/apache/sling...
Author: tommaso
Date: Mon Jan 20 17:23:50 2014
New Revision: 1559782
URL: http://svn.apache.org/r1559782
Log:
SLING-3327 - applied Marius Petria's patch
Modified:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentRootServlet.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationQueueServlet.java
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentJobConsumerTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java Mon Jan 20 17:23:50 2014
@@ -72,18 +72,19 @@ public interface ReplicationAgent {
void send(ReplicationRequest replicationRequest) throws AgentReplicationException;
/**
- * synchronously process the replication of a certain item skipping the underlying queue(s)
- *
- * @param item a {@link ReplicationPackage} to process
- * @return <code>true</code> if process was successful, <code>false</code> otherwise
- * @throws AgentReplicationException
- */
- boolean process(ReplicationPackage item) throws AgentReplicationException;
-
- /**
* get the agent configured endpoint
*
* @return an <code>URI</code> specifying its endpoint
*/
URI getEndpoint();
+
+
+ /**
+ * removes a package from the top of the queue
+ * @param queueName
+ * the name of a {@link ReplicationQueue} bound tothis agent
+ * @return
+ * @throws ReplicationQueueException
+ */
+ ReplicationPackage removeHead(String queueName) throws ReplicationQueueException;
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Mon Jan 20 17:23:50 2014
@@ -18,8 +18,10 @@
*/
package org.apache.sling.replication.agent.impl;
-import java.util.*;
-
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Random;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -28,14 +30,12 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.replication.agent.AgentConfigurationException;
import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
import org.apache.sling.replication.queue.impl.SingleQueueDistributionStrategy;
-import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueue;
import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider;
import org.apache.sling.replication.rule.ReplicationRuleEngine;
import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
@@ -197,7 +197,7 @@ public class ReplicationAgentServiceFact
transportHandler, transportAuthenticationProvider, endpoint, packageBuilder, queueProvider, queueDistributionStrategy});
}
- ReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, useAggregatePaths,
+ SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, useAggregatePaths,
transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, queueDistributionStrategy);
// register agent service
@@ -208,14 +208,7 @@ public class ReplicationAgentServiceFact
replicationRuleEngine.applyRules(agent, rules);
}
- // eventually register job consumer for sling job handling based queues
- if (DEFAULT_QUEUEPROVIDER.equals(queue) && (transportHandler != null && endpoint != null && endpoint.length() > 0)) {
- Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
- String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
- String childTopic = topic + "/*";
- jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
- jobReg = context.registerService(JobConsumer.class.getName(), new ReplicationAgentJobConsumer(agent, packageBuilder), jobProps);
- }
+ queueProvider.enableQueueProcessing(agent, agent);
}
}
@@ -230,9 +223,7 @@ public class ReplicationAgentServiceFact
replicationRuleEngine.unapplyRules(replicationAgent, rules);
}
- if (jobReg != null) {
- jobReg.unregister();
- }
+ queueProvider.disableQueueProcessing(replicationAgent);
if (agentReg != null) {
agentReg.unregister();
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Mon Jan 20 17:23:50 2014
@@ -24,14 +24,11 @@ import org.apache.sling.replication.agen
import org.apache.sling.replication.communication.ReplicationEndpoint;
import org.apache.sling.replication.communication.ReplicationRequest;
import org.apache.sling.replication.communication.ReplicationResponse;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
import org.apache.sling.replication.serialization.ReplicationPackage;
import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.transport.ReplicationTransportException;
import org.apache.sling.replication.transport.TransportHandler;
import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
@@ -44,7 +41,7 @@ import java.util.List;
/**
* Basic implementation of a {@link ReplicationAgent}
*/
-public class SimpleReplicationAgent implements ReplicationAgent {
+public class SimpleReplicationAgent implements ReplicationAgent, ReplicationQueueProcessor {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -152,10 +149,15 @@ public class SimpleReplicationAgent impl
private ReplicationResponse schedule(ReplicationPackage replicationPackage, boolean offer) throws AgentReplicationException {
ReplicationResponse replicationResponse = new ReplicationResponse();
+ ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
+ replicationPackage.getPaths(),
+ replicationPackage.getAction(),
+ replicationPackage.getType());
if(offer){
try {
- queueDistributionStrategy.offer(replicationPackage, this, queueProvider);
+
+ queueDistributionStrategy.offer(replicationQueueItem, this, queueProvider);
} catch (ReplicationQueueException e) {
replicationResponse.setSuccessful(false);
throw new AgentReplicationException(e);
@@ -164,7 +166,7 @@ public class SimpleReplicationAgent impl
else {
// send the replication package to the queue distribution handler
try {
- ReplicationQueueItemState state = queueDistributionStrategy.add(replicationPackage,
+ ReplicationQueueItemState state = queueDistributionStrategy.add(replicationQueueItem,
this, queueProvider);
if (state != null) {
replicationResponse.setStatus(state.getItemState().toString());
@@ -183,24 +185,45 @@ public class SimpleReplicationAgent impl
return replicationResponse;
}
-
- public boolean process(ReplicationPackage item) throws AgentReplicationException {
+ public boolean process(ReplicationQueueItem itemInfo) {
try {
- if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) {
- transportHandler.transport(item, new ReplicationEndpoint(endpoint),
- transportAuthenticationProvider);
- return true;
- } else {
- if (log.isInfoEnabled()) {
+ ReplicationPackage replicationPackage = packageBuilder.getPackage(itemInfo.getId());
+ if(replicationPackage == null){
+ return false;
+ }
+
+ try {
+ if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) {
+ transportHandler.transport(replicationPackage,
+ new ReplicationEndpoint(endpoint),
+ transportAuthenticationProvider);
+
+ return true;
+ } else {
log.info("agent {} processing skipped", name);
+ return false;
}
- return false;
+ }
+ finally {
+ replicationPackage.delete();
}
} catch (ReplicationTransportException e) {
- throw new AgentReplicationException(e);
+ log.error("transport error", e);
+ return false;
}
}
+ public ReplicationPackage removeHead(String queueName) throws ReplicationQueueException {
+ ReplicationQueue queue = getQueue(queueName);
+ ReplicationQueueItem info = queue.getHead();
+ if(info == null) return null;
+
+ queue.removeHead();
+
+ ReplicationPackage replicationPackage = packageBuilder.getPackage(info.getId());
+ return replicationPackage;
+ }
+
public URI getEndpoint() {
return new ReplicationEndpoint(endpoint).getUri();
}
@@ -222,4 +245,5 @@ public class SimpleReplicationAgent impl
public String[] getRules() {
return rules;
}
+
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java Mon Jan 20 17:23:50 2014
@@ -38,9 +38,9 @@ import org.apache.sling.hc.api.HealthChe
import org.apache.sling.hc.api.Result;
import org.apache.sling.hc.util.FormattingResultLog;
import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +112,7 @@ public class ReplicationQueueHealthCheck
for (ReplicationQueueProvider replicationQueueProvider : replicationQueueProviders) {
for (ReplicationQueue q : replicationQueueProvider.getAllQueues())
try {
- ReplicationPackage item = q.getHead();
+ ReplicationQueueItem item = q.getHead();
if (item != null) {
ReplicationQueueItemState status = q.getStatus(item);
if (status.getAttempts() <= numberOfRetriesAllowed) {
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java Mon Jan 20 17:23:50 2014
@@ -19,7 +19,6 @@
package org.apache.sling.replication.queue;
import java.util.Collection;
-import org.apache.sling.replication.serialization.ReplicationPackage;
/**
* a queue for handling {@link org.apache.sling.replication.agent.ReplicationAgent}s' requests
@@ -36,21 +35,23 @@ public interface ReplicationQueue {
/**
* add a replication package to this queue
*
+ *
* @param replicationPackage a replication package to replicate
* @return <code>true</code> if the replication package was added correctly to the queue,
* <code>false</code otherwise
* @throws ReplicationQueueException
*/
- boolean add(ReplicationPackage replicationPackage) throws ReplicationQueueException;
+ boolean add(ReplicationQueueItem replicationPackage) throws ReplicationQueueException;
/**
* get the status of a certain package in the queue
*
+ *
* @param replicationPackage the replication package to get the status for
* @return the item status in the queue
* @throws ReplicationQueueException
*/
- ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
+ ReplicationQueueItemState getStatus(ReplicationQueueItem replicationPackage)
throws ReplicationQueueException;
/**
@@ -58,7 +59,7 @@ public interface ReplicationQueue {
*
* @return the first replication package into the queue
*/
- ReplicationPackage getHead();
+ ReplicationQueueItem getHead();
/**
* remove the first package into the queue from it
@@ -79,7 +80,7 @@ public interface ReplicationQueue {
*
* @return a <code>Collection</code> of {@link org.apache.sling.replication.serialization.ReplicationPackage}s
*/
- Collection<ReplicationPackage> getItems();
+ Collection<ReplicationQueueItem> getItems();
/**
* remove an item from the queue by specifying its id
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -31,26 +31,28 @@ public interface ReplicationQueueDistrib
* synchronously distribute a {@link ReplicationPackage} to a {@link ReplicationAgent} to a {@link ReplicationQueue}
* provided by the given {@link ReplicationQueueProvider}
*
- * @param replicationPackage a {@link ReplicationPackage} to distribute
- * @param agent the {@link ReplicationAgent} to be used for replicating the package
- * @param queueProvider the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
+ *
+ * @param replicationPackage a {@link org.apache.sling.replication.serialization.ReplicationPackage} to distribute
+ * @param agent the {@link org.apache.sling.replication.agent.ReplicationAgent} to be used for replicating the package
+ * @param queueProvider the {@link org.apache.sling.replication.queue.ReplicationQueueProvider} used to provide the queue to be used for the given package
* @return a {@link ReplicationQueueItemState} representing the state of the package in the queue after its distribution
* @throws ReplicationQueueException
*/
- ReplicationQueueItemState add(ReplicationPackage replicationPackage, ReplicationAgent agent,
+ ReplicationQueueItemState add(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
/**
* asynchronously distribute a {@link ReplicationPackage} to a {@link ReplicationAgent} to a {@link ReplicationQueue}
* provided by the given {@link ReplicationQueueProvider}
*
- * @param replicationPackage a {@link ReplicationPackage} to distribute
- * @param agent the {@link ReplicationAgent} to be used for replicating the package
- * @param queueProvider the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
+ *
+ * @param replicationPackage a {@link org.apache.sling.replication.serialization.ReplicationPackage} to distribute
+ * @param agent the {@link org.apache.sling.replication.agent.ReplicationAgent} to be used for replicating the package
+ * @param queueProvider the {@link org.apache.sling.replication.queue.ReplicationQueueProvider} used to provide the queue to be used for the given package
* @return <code>true</code> if the package could be distributed to a {@link ReplicationQueue}, <code>false</code> otherwise
* @throws ReplicationQueueException
*/
- boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+ boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -21,7 +21,6 @@ package org.apache.sling.replication.que
import java.util.Collection;
import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.serialization.ReplicationPackage;
/**
* A provider for {@link ReplicationQueue}s
@@ -42,19 +41,6 @@ public interface ReplicationQueueProvide
ReplicationQueue getQueue(ReplicationAgent agent, String name)
throws ReplicationQueueException;
- /**
- * provide the queue to be used for a certain agent and package or creates it if it doesn't
- * exist
- *
- * @param agent
- * the replication agent needing the queue
- * @param replicationPackage
- * the package for which the queue should be used
- * @return a replication queue to be used for the given parameters
- * @throws ReplicationQueueException
- */
- ReplicationQueue getQueue(ReplicationAgent agent, ReplicationPackage replicationPackage)
- throws ReplicationQueueException;
/**
* get the default queue to be used for a certain agent
@@ -83,4 +69,21 @@ public interface ReplicationQueueProvide
* @throws ReplicationQueueException
*/
void removeQueue(ReplicationQueue queue) throws ReplicationQueueException;
+
+ /**
+ * enables queue driven processing for an agent.
+ * @param agent
+ * a replication agent
+ * @param queueProcessor
+ * the callback that is called when an item needs processing
+ */
+ void enableQueueProcessing(ReplicationAgent agent, ReplicationQueueProcessor queueProcessor);
+
+
+ /**
+ * disables queue driven processing for an agent
+ * @param agent
+ * a replication agent
+ */
+ void disableQueueProcessing(ReplicationAgent agent);
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -40,10 +40,6 @@ public abstract class AbstractReplicatio
private final Map<String, ReplicationQueue> queueMap = new HashMap<String, ReplicationQueue>();
- public ReplicationQueue getQueue(ReplicationAgent agent,
- ReplicationPackage replicationPackage) throws ReplicationQueueException {
- return getQueue(agent, replicationPackage.getAction());
- }
public ReplicationQueue getQueue(ReplicationAgent agent, String queueName)
throws ReplicationQueueException {
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -25,13 +25,9 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.*;
import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +72,7 @@ public class ErrorAwareQueueDistribution
timeThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(TIME_THRESHOLD), 10800000);
}
- public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+ public ReplicationQueueItemState add(ReplicationQueueItem replicationPackage,
ReplicationAgent agent, ReplicationQueueProvider queueProvider)
throws ReplicationQueueException {
try {
@@ -111,7 +107,7 @@ public class ErrorAwareQueueDistribution
}
}
- public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+ public boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
boolean added;
ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
@@ -129,7 +125,7 @@ public class ErrorAwareQueueDistribution
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
ReplicationQueue defaultQueue = queueProvider.getDefaultQueue(agent);
// get first item in the queue with its status
- ReplicationPackage firstItem = defaultQueue.getHead();
+ ReplicationQueueItem firstItem = defaultQueue.getHead();
if (firstItem != null) {
ReplicationQueueItemState status = defaultQueue.getStatus(firstItem);
// if item is still in the queue after a max no. of attempts, move it to the error queue
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -25,18 +25,14 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.queue.*;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
/**
* Distribution algorithm which keeps one specific queue to handle specific paths and another queue
@@ -61,7 +57,7 @@ public class PriorityPathDistributionStr
priorityPaths = PropertiesUtil.toStringArray(context.getProperties().get(PRIORITYPATHS));
}
- public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+ public ReplicationQueueItemState add(ReplicationQueueItem replicationPackage,
ReplicationAgent agent, ReplicationQueueProvider queueProvider)
throws ReplicationQueueException {
if (log.isInfoEnabled()) {
@@ -95,7 +91,7 @@ public class PriorityPathDistributionStr
}
- private ReplicationQueue getQueue(ReplicationPackage replicationPackage,
+ private ReplicationQueue getQueue(ReplicationQueueItem replicationPackage,
ReplicationAgent agent, ReplicationQueueProvider queueProvider)
throws ReplicationQueueException {
String[] paths = replicationPackage.getPaths();
@@ -131,7 +127,7 @@ public class PriorityPathDistributionStr
return queue;
}
- public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+ public boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
ReplicationQueue queue = getQueue(replicationPackage, agent, queueProvider);
if (queue != null) {
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -21,17 +21,13 @@ package org.apache.sling.replication.que
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.queue.*;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
/**
* The default strategy for delivering packages to queues. Each agent just manages a single queue,
@@ -46,7 +42,7 @@ public class SingleQueueDistributionStra
private final Logger log = LoggerFactory.getLogger(getClass());
- public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+ public ReplicationQueueItemState add(ReplicationQueueItem replicationPackage,
ReplicationAgent agent, ReplicationQueueProvider queueProvider)
throws ReplicationQueueException {
if (log.isInfoEnabled()) {
@@ -78,7 +74,7 @@ public class SingleQueueDistributionStra
}
- public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+ public boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
if (queue != null) {
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java Mon Jan 20 17:23:50 2014
@@ -30,9 +30,9 @@ import org.apache.sling.event.jobs.JobMa
import org.apache.sling.event.jobs.JobManager.QueryType;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.serialization.ReplicationPackage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +61,7 @@ public class JobHandlingReplicationQueue
return name;
}
- public boolean add(ReplicationPackage replicationPackage) {
+ public boolean add(ReplicationQueueItem replicationPackage) {
boolean result = true;
try {
Map<String, Object> properties = JobHandlingUtils
@@ -80,7 +80,7 @@ public class JobHandlingReplicationQueue
return result;
}
- public ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
+ public ReplicationQueueItemState getStatus(ReplicationQueueItem replicationPackage)
throws ReplicationQueueException {
ReplicationQueueItemState itemStatus = new ReplicationQueueItemState();
try {
@@ -103,7 +103,7 @@ public class JobHandlingReplicationQueue
return itemStatus;
}
- public ReplicationPackage getHead() {
+ public ReplicationQueueItem getHead() {
Job firstItem = getFirstItem();
if (firstItem != null) {
return JobHandlingUtils.getPackage(firstItem);
@@ -149,8 +149,8 @@ public class JobHandlingReplicationQueue
return getItems().isEmpty();
}
- public Collection<ReplicationPackage> getItems() {
- Collection<ReplicationPackage> items = new LinkedList<ReplicationPackage>();
+ public Collection<ReplicationQueueItem> getItems() {
+ Collection<ReplicationQueueItem> items = new LinkedList<ReplicationQueueItem>();
Collection<Job> jobs = jobManager.findJobs(QueryType.ALL, topic, -1);
for (Job job : jobs) {
items.add(JobHandlingUtils.getPackage(job));
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -21,15 +21,18 @@ package org.apache.sling.replication.que
import java.io.IOException;
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.*;
import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
@@ -53,6 +56,10 @@ public class JobHandlingReplicationQueue
@Reference
private ConfigurationAdmin configAdmin;
+ private Map<String, ServiceRegistration> jobs = new ConcurrentHashMap<String, ServiceRegistration>();
+ private BundleContext context;
+
+
@Override
protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String queueName)
throws ReplicationQueueException {
@@ -84,4 +91,37 @@ public class JobHandlingReplicationQueue
q.removeAll();
}
+ public void enableQueueProcessing(ReplicationAgent agent, ReplicationQueueProcessor queueProcessor) {
+ // TODO: make this configurable (whether to create self processing queues or not)
+ if (agent.getEndpoint() == null || agent.getEndpoint().toString().length() == 0) return;
+
+ // eventually register job consumer for sling job handling based queues
+ Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
+ String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agent.getName();
+ String childTopic = topic + "/*";
+ jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
+ ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
+ new ReplicationAgentJobConsumer(agent, queueProcessor), jobProps);
+ jobs.put(agent.getName(), jobReg);
+ }
+
+
+ public void disableQueueProcessing(ReplicationAgent agent) {
+ ServiceRegistration jobReg = jobs.remove(agent.getName());
+ if (jobReg != null) {
+ jobReg.unregister();
+ }
+ }
+
+ @Activate
+ private void activate(BundleContext context){
+ this.context = context;
+ }
+
+ @Deactivate
+ private void deactivate(BundleContext context){
+ for (ServiceRegistration jobReg : jobs.values()){
+ jobReg.unregister();
+ }
+ }
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Mon Jan 20 17:23:50 2014
@@ -19,15 +19,13 @@
package org.apache.sling.replication.queue.impl.jobhandling;
import java.io.IOException;
-import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.io.IOUtils;
+
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.ScheduledJobInfo;
-import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,115 +35,32 @@ public class JobHandlingUtils {
private static final String PATHS = "replication.package.paths";
- private static final String ID = "replication.package.id";
-
- private static final String LENGTH = "replication.package.length";
-
- private static final String BIN = "replication.package.stream";
+ public static final String ID = "replication.package.id";
private static final String TYPE = "replication.package.type";
protected static final String ACTION = "replication.package.action";
- public static ReplicationPackage getPackage(ScheduledJobInfo info, JobManager jobManager,
- String topic) {
- Job job = getJob(info, jobManager, topic);
- return JobHandlingUtils.getPackage(job);
- }
-
- public static Job getJob(ScheduledJobInfo info, JobManager jobManager, String topic) {
- String id = String.valueOf(info.getJobProperties().get(ID));
- Map<String, Object> jobProps = JobHandlingUtils.createIdPropertiesFromId(id);
- return jobManager.getJob(topic, jobProps);
- }
-
- public static ReplicationPackage getPackage(ReplicationPackageBuilder packageBuilder,
- final Job job) {
- ReplicationPackage pkg = null;
- String id = String.valueOf(job.getProperty(ID));
- try {
- pkg = packageBuilder.getPackage(id);
- if (pkg != null) {
- if (log.isInfoEnabled()) {
- log.info("successfully retrieved a package with id {}", id);
- }
- }
- } catch (Exception e) {
- if (log.isWarnEnabled()) {
- log.warn("failed retrieving a package with id {}", id);
- }
- }
- if (pkg == null) {
- try {
- pkg = getPackage(job);
- if (pkg != null) {
- if (log.isInfoEnabled()) {
- log.info("successfully deserialized a package from job {}", job);
- }
- }
- } catch (Exception e) {
- if (log.isWarnEnabled()) {
- log.warn("failed deserializing package from job {}", job, e);
- }
- }
- }
- if (pkg == null) {
- if (log.isErrorEnabled()) {
- log.error("could not find a package from job {}", job);
- }
- }
- return pkg;
- }
-
- public static ReplicationPackage getPackage(final Job job) {
- return new ReplicationPackage() {
-
- private static final long serialVersionUID = 1L;
-
- public String[] getPaths() {
- return (String[]) job.getProperty(PATHS);
- }
-
- public long getLength() {
- return (Long) job.getProperty(LENGTH);
- }
-
- public InputStream createInputStream() throws IOException {
- return IOUtils.toInputStream(String.valueOf(job.getProperty(BIN)));
-
- // workaround to make void package work while we get SLING-3140 to be released
-// return IOUtils.toInputStream(String.valueOf(job.getProperty(ID)));
- }
-
- public String getId() {
- return String.valueOf(job.getProperty(ID));
- }
-
- public String getType() {
- return String.valueOf(job.getProperty(TYPE));
- }
-
- public String getAction() {
- return String.valueOf(job.getProperty(ACTION));
- }
- };
+ public static ReplicationQueueItem getPackage(final Job job) {
+ return new ReplicationQueueItem(String.valueOf(job.getProperty(ID)),
+ (String[]) job.getProperty(PATHS),
+ String.valueOf(job.getProperty(ACTION)),
+ String.valueOf(job.getProperty(TYPE)));
}
public static Map<String, Object> createFullPropertiesFromPackage(
- ReplicationPackage replicationPackage) throws IOException {
+ ReplicationQueueItem replicationPackage) throws IOException {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ID, replicationPackage.getId());
properties.put(PATHS, replicationPackage.getPaths());
- properties.put(LENGTH, replicationPackage.getLength());
properties.put(ACTION, replicationPackage.getAction());
- properties.put(BIN, IOUtils.toString(replicationPackage.createInputStream()));
properties.put(TYPE, replicationPackage.getType());
return properties;
}
public static Map<String, Object> createIdPropertiesFromPackage(
- ReplicationPackage replicationPackage) {
+ ReplicationQueueItem replicationPackage) {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ID, replicationPackage.getId());
return properties;
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java Mon Jan 20 17:23:50 2014
@@ -19,68 +19,51 @@
package org.apache.sling.replication.queue.impl.simple;
import java.util.Arrays;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Properties;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferencePolicy;
-import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.replication.agent.AgentReplicationException;
+
import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a simple scheduled {@link SimpleReplicationQueue}s processor
*/
-@Component(
- label = "In memory Replication Queues processor",
- description = "Service that trigger processing of elements in memory resident replication queues.",
- metatype = true)
-@Service(value = Runnable.class)
-@Properties({
- @Property(name = "scheduler.period", longValue = 10, label = "Frequency", description = "Processing frequency in seconds"),
- @Property(name = "scheduler.concurrent", boolValue = false, propertyPrivate = true) })
public class ScheduledReplicationQueueProcessor implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ReplicationQueueProvider queueProvider;
+ private final ReplicationQueueProcessor queueProcessor;
+
+ public ScheduledReplicationQueueProcessor(ReplicationQueueProvider queueProvider,
+ ReplicationQueueProcessor queueProcessor){
+
+ this.queueProvider = queueProvider;
+ this.queueProcessor = queueProcessor;
+ }
- @Reference(name = "ReplicationQueueProvider", target = "(name="
- + SimpleReplicationQueueProvider.NAME + ")", policy = ReferencePolicy.DYNAMIC)
- private ReplicationQueueProvider replicationQueueProvider;
public void run() {
try {
- for (ReplicationQueue queue : replicationQueueProvider.getAllQueues()) {
+ for (ReplicationQueue queue : queueProvider.getAllQueues()) {
while (!queue.isEmpty()) {
// synchronized (queue) {
- ReplicationPackage item = queue.getHead();
+ ReplicationQueueItem item = queue.getHead();
if (item != null) {
- try {
- if (((SimpleReplicationQueue)queue).getAgent().process(item)) {
- queue.removeHead();
- } else {
- if (log.isWarnEnabled()) {
- log.warn("processing of item {} failed",
- Arrays.toString(item.getPaths()));
- }
- }
- } catch (AgentReplicationException e) {
- if (log.isErrorEnabled()) {
- log.error("an error happened while processing an item {}",
- Arrays.toString(item.getPaths()));
- }
+ if (queueProcessor.process(item)) {
+ queue.removeHead();
+ } else {
+ log.warn("processing of item {} failed",
+ Arrays.toString(item.getPaths()));
+
}
}
// }
}
}
} catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("error while processing queue {}", e);
- }
+ log.error("error while processing queue {}", e);
}
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java Mon Jan 20 17:23:50 2014
@@ -27,9 +27,9 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.TimeUnit;
import org.apache.sling.replication.agent.ReplicationAgent;
import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.queue.ReplicationQueueItemState;
import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.serialization.ReplicationPackage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,9 +47,9 @@ public class SimpleReplicationQueue impl
private final String name;
- private final BlockingQueue<ReplicationPackage> queue;
+ private final BlockingQueue<ReplicationQueueItem> queue;
- private final Map<ReplicationPackage, ReplicationQueueItemState> statusMap;
+ private final Map<ReplicationQueueItem, ReplicationQueueItemState> statusMap;
public SimpleReplicationQueue(ReplicationAgent agent, String name) {
if (log.isInfoEnabled()) {
@@ -57,15 +57,15 @@ public class SimpleReplicationQueue impl
}
this.agent = agent;
this.name = name;
- this.queue = new LinkedBlockingQueue<ReplicationPackage>();
- this.statusMap = new WeakHashMap<ReplicationPackage, ReplicationQueueItemState>(10);
+ this.queue = new LinkedBlockingQueue<ReplicationQueueItem>();
+ this.statusMap = new WeakHashMap<ReplicationQueueItem, ReplicationQueueItemState>(10);
}
public String getName() {
return name;
}
- public boolean add(ReplicationPackage replicationPackage) {
+ public boolean add(ReplicationQueueItem replicationPackage) {
ReplicationQueueItemState status = new ReplicationQueueItemState();
boolean result = false;
try {
@@ -80,7 +80,7 @@ public class SimpleReplicationQueue impl
return result;
}
- public ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage) {
+ public ReplicationQueueItemState getStatus(ReplicationQueueItem replicationPackage) {
ReplicationQueueItemState status = statusMap.get(replicationPackage);
if (queue.contains(replicationPackage)) {
status.setItemState(ItemState.QUEUED);
@@ -94,8 +94,8 @@ public class SimpleReplicationQueue impl
return agent;
}
- public ReplicationPackage getHead() {
- ReplicationPackage element = queue.peek();
+ public ReplicationQueueItem getHead() {
+ ReplicationQueueItem element = queue.peek();
if (element != null) {
ReplicationQueueItemState replicationQueueItemStatus = statusMap.get(element);
replicationQueueItemStatus.setAttempts(replicationQueueItemStatus.getAttempts() + 1);
@@ -104,7 +104,7 @@ public class SimpleReplicationQueue impl
}
public void removeHead() {
- ReplicationPackage element = queue.remove();
+ ReplicationQueueItem element = queue.remove();
statusMap.get(element).setSuccessful(true);
}
@@ -112,13 +112,13 @@ public class SimpleReplicationQueue impl
return queue.isEmpty();
}
- public Collection<ReplicationPackage> getItems() {
+ public Collection<ReplicationQueueItem> getItems() {
return queue;
}
public void remove(String id) {
- ReplicationPackage toRemove = null;
- for (ReplicationPackage item : queue) {
+ ReplicationQueueItem toRemove = null;
+ for (ReplicationQueueItem item : queue) {
if (id.equals(item.getId())) {
toRemove = item;
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -20,9 +20,13 @@ package org.apache.sling.replication.que
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueException;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
@@ -38,6 +42,9 @@ import org.apache.sling.replication.queu
public class SimpleReplicationQueueProvider extends AbstractReplicationQueueProvider implements
ReplicationQueueProvider {
+ @Reference
+ Scheduler scheduler;
+
public static final String NAME = "simple";
protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String selector)
@@ -49,4 +56,18 @@ public class SimpleReplicationQueueProvi
// do nothing as queues just exist in the cache
}
+ public void enableQueueProcessing(ReplicationAgent agent, ReplicationQueueProcessor queueProcessor) {
+ ScheduleOptions options = scheduler.NOW(-1, 10)
+ .canRunConcurrently(false)
+ .name(getJobName(agent));
+ scheduler.schedule(new ScheduledReplicationQueueProcessor(this, queueProcessor), options);
+ }
+
+ public void disableQueueProcessing(ReplicationAgent agent) {
+ scheduler.unschedule(getJobName(agent));
+ }
+
+ private String getJobName(ReplicationAgent agent){
+ return SimpleReplicationQueueProvider.NAME+"-queueProcessor-"+agent.getName();
+ }
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java Mon Jan 20 17:23:50 2014
@@ -18,6 +18,8 @@
*/
package org.apache.sling.replication.serialization;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
@@ -65,4 +67,15 @@ public interface ReplicationPackage exte
*/
long getLength();
+
+ /**
+ * releases resources associated with this object
+ */
+ void close();
+
+ /**
+ * releases all resources associated with the package id
+ */
+ void delete();
+
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java Mon Jan 20 17:23:50 2014
@@ -55,4 +55,5 @@ public interface ReplicationPackageBuild
* @return a {@link ReplicationPackage} if one with such an id exists, <code>null</code> otherwise
*/
ReplicationPackage getPackage(String id);
+
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java Mon Jan 20 17:23:50 2014
@@ -34,11 +34,4 @@ public interface ReplicationPackageImpor
*/
boolean importStream(InputStream stream, String type);
- /**
- * Asynchronously schedules the import of the {@link org.apache.sling.replication.serialization.ReplicationPackage}
- *
- * @param stream the <code>InputStream</code> of the given <code>ReplicationPackage</code>
- * @param type the <code>String</code> representing the ({@link ReplicationPackage#getType() type} of the given package
- */
- void scheduleImport(InputStream stream, String type) throws Exception;
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java Mon Jan 20 17:23:50 2014
@@ -19,6 +19,8 @@
package org.apache.sling.replication.serialization.impl;
import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
@@ -76,9 +78,7 @@ public abstract class AbstractReplicatio
replicationPackage = readPackageForDelete(stream);
}
} catch (Exception e) {
- if (log.isWarnEnabled()) {
- log.warn("{}", e);
- }
+ log.warn("cannot parse stream", e);
}
stream.mark(-1);
if (replicationPackage == null) {
@@ -91,22 +91,17 @@ public abstract class AbstractReplicatio
ReplicationPackage replicationPackage = null;
Session session = null;
try {
- VoidReplicationPackage voidReplicationPackage = VoidReplicationPackage.fromStream(stream);
- if (voidReplicationPackage != null) {
+ replicationPackage = VoidReplicationPackage.fromStream(stream);
+
+ if(replicationPackage != null){
session = getSession();
- if (session != null) {
- for (String path : voidReplicationPackage.getPaths()) {
- if (session.itemExists(path)) {
- session.removeItem(path);
- }
+ for (String path : replicationPackage.getPaths()) {
+ if (session.itemExists(path)) {
+ session.removeItem(path);
}
- session.save();
- ReplicationRequest request = new ReplicationRequest(System.currentTimeMillis(),
- ReplicationActionType.DELETE, voidReplicationPackage.getPaths());
- replicationPackage = new VoidReplicationPackage(request, getName());
}
+ session.save();
}
- return replicationPackage;
} catch (Exception e) {
throw new ReplicationPackageReadingException(e);
} finally {
@@ -115,6 +110,20 @@ public abstract class AbstractReplicatio
}
}
+ return replicationPackage;
+ }
+
+ public ReplicationPackage getPackage(String id) {
+ ReplicationPackage replicationPackage = null;
+ try {
+ replicationPackage = VoidReplicationPackage.fromStream(new ByteArrayInputStream(id.getBytes()));
+ }
+ catch (IOException ex){
+ }
+
+ if(replicationPackage != null) return replicationPackage;
+
+ return getPackageInternal(id);
}
protected abstract String getName();
@@ -124,4 +133,7 @@ public abstract class AbstractReplicatio
protected abstract ReplicationPackage readPackageForAdd(InputStream stream, boolean install)
throws ReplicationPackageReadingException;
+
+ protected abstract ReplicationPackage getPackageInternal(String id);
+
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java Mon Jan 20 17:23:50 2014
@@ -29,6 +29,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.References;
import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,8 +64,8 @@ public class DefaultReplicationPackageBu
final ReplicationPackageBuilder replicationPackageBuilder,
Map<String, Object> properties) {
synchronized (replicationPackageBuilders) {
- replicationPackageBuilders.put(String.valueOf(properties.get("name")),
- replicationPackageBuilder);
+ String name = PropertiesUtil.toString(properties.get("name"), "");
+ replicationPackageBuilders.put(name, replicationPackageBuilder);
}
log.debug("Registering Replication Package Builder {} ", replicationPackageBuilder);
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java Mon Jan 20 17:23:50 2014
@@ -19,34 +19,21 @@
package org.apache.sling.replication.serialization.impl;
import java.io.BufferedInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
-import java.util.Map;
-import org.apache.commons.io.IOUtils;
-import org.apache.felix.scr.annotations.Activate;
+
import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
-import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.QueueConfiguration;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.replication.event.ReplicationEventFactory;
import org.apache.sling.replication.event.ReplicationEventType;
-import org.apache.sling.replication.queue.ReplicationQueueException;
import org.apache.sling.replication.serialization.ReplicationPackage;
import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
import org.apache.sling.replication.serialization.ReplicationPackageImporter;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,41 +62,6 @@ public class DefaultReplicationPackageIm
@Reference
private ConfigurationAdmin configAdmin;
- private ServiceRegistration jobReg;
-
-
- @Activate
- protected void activate(BundleContext context) throws Exception {
- try {
- if (jobManager.getQueue(QUEUE_NAME) == null) {
- Configuration config = configAdmin.createFactoryConfiguration(
- QueueConfiguration.class.getName(), null);
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
- props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
- props.put(ConfigurationConstants.PROP_TOPICS, new String[]{QUEUE_TOPIC, QUEUE_TOPIC + "/*"});
- props.put(ConfigurationConstants.PROP_RETRIES, -1);
- props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
- props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
- props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
- config.update(props);
- }
- } catch (IOException e) {
- throw new ReplicationQueueException("could not create an import queue", e);
- }
-
- Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
- jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{QUEUE_TOPIC});
- jobReg = context.registerService(JobConsumer.class.getName(), new ReplicationPackageImporterJobConsumer(), jobProps);
- }
-
- @Deactivate
- public void deactivate() {
- if (jobReg != null) {
- jobReg.unregister();
- }
- }
-
public boolean importStream(InputStream stream, String type) {
boolean success = false;
try {
@@ -119,9 +71,7 @@ public class DefaultReplicationPackageIm
if (replicationPackageBuilder != null) {
replicationPackage = replicationPackageBuilder.readPackage(stream, true);
} else {
- if (log.isWarnEnabled()) {
- log.warn("cannot read streams of type {}", type);
- }
+ log.warn("cannot read streams of type {}", type);
}
} else {
BufferedInputStream bufferedInputStream = new BufferedInputStream(stream); // needed to allow for multiple reads
@@ -129,69 +79,29 @@ public class DefaultReplicationPackageIm
try {
replicationPackage = replicationPackageBuilder.readPackage(bufferedInputStream, true);
} catch (Exception e) {
- if (log.isWarnEnabled()) {
- log.warn("received stream cannot be read with {}", replicationPackageBuilder);
- }
+ log.warn("received stream cannot be read with {}", replicationPackageBuilder);
}
}
}
if (replicationPackage != null) {
- if (log.isInfoEnabled()) {
- log.info("replication package read and installed for path(s) {}",
- Arrays.toString(replicationPackage.getPaths()));
- }
+ log.info("replication package read and installed for path(s) {}",
+ Arrays.toString(replicationPackage.getPaths()));
Dictionary<String, Object> dictionary = new Hashtable<String, Object>();
dictionary.put("replication.action", replicationPackage.getAction());
dictionary.put("replication.path", replicationPackage.getPaths());
replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_INSTALLED, dictionary);
success = true;
+
+ replicationPackage.delete();
} else {
- if (log.isWarnEnabled()) {
- log.warn("could not read a replication package");
- }
+ log.warn("could not read a replication package");
}
} catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("cannot import a package from the given stream of type {}", type);
- }
+ log.error("cannot import a package from the given stream of type {}", type);
}
return success;
}
- public void scheduleImport(InputStream stream, String type) throws Exception {
- try {
- Map<String, Object> properties = new HashMap<String, Object>();
- properties.put("bin", IOUtils.toString(stream)); // TODO : compress / encode the stream
- properties.put("type", type);
- Job job = jobManager.createJob(QUEUE_TOPIC).properties(properties).add();
- if (log.isInfoEnabled()) {
- log.info("job added {}", job);
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("could not add an item to the queue");
- }
- throw e;
- }
- }
-
- private class ReplicationPackageImporterJobConsumer implements JobConsumer {
-
- public JobResult process(Job job) {
- try {
- InputStream stream = IOUtils.toInputStream(String.valueOf(job.getProperty("bin"))); // TODO : decompress / decode the stream string
- boolean result = importStream(stream, String.valueOf(job.getProperty("type")));
- return result ? JobResult.OK : JobResult.FAILED;
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("could not process import job correctly", e);
- }
- return JobResult.FAILED;
- }
- }
- }
-
-
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java Mon Jan 20 17:23:50 2014
@@ -26,6 +26,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.vault.util.Text;
import org.apache.sling.replication.communication.ReplicationActionType;
import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
import org.apache.sling.replication.serialization.ReplicationPackage;
/**
@@ -45,31 +46,36 @@ public class VoidReplicationPackage impl
this.type = type;
this.paths = request.getPaths();
this.action = request.getAction().toString();
- this.id = request.getAction().toString() + ':' + Arrays.toString(request.getPaths()) + ':' + request.getTime();
+ this.id = request.getAction().toString()
+ + ':' + Arrays.toString(request.getPaths())
+ + ':' + request.getTime()
+ + ':' + type;
}
public static VoidReplicationPackage fromStream(InputStream stream) throws IOException {
- VoidReplicationPackage replicationPackage = null;
String streamString = IOUtils.toString(stream);
- int beginIndex = streamString.indexOf(':');
- int endIndex = streamString.lastIndexOf(':');
- if (beginIndex >= 0 && endIndex > beginIndex){
- String actionString = streamString.substring(0, beginIndex);
- String pathsString = streamString.substring(beginIndex+1, endIndex);
- String timeString = streamString.substring(endIndex + 1);
-
- ReplicationActionType replicationActionType = ReplicationActionType.fromName(actionString);
-
- if(replicationActionType != null){
- pathsString = Text.unescape(pathsString);
- String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
-
- ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString),
- replicationActionType, paths);
- replicationPackage = new VoidReplicationPackage(request, "VOID");
- }
+ String[] parts = streamString.split(":");
+
+ if(parts == null || parts.length < 4) return null;
+
+ String actionString = parts[0];
+ String pathsString = parts[1];
+ String timeString = parts[2];
+ String typeString = parts[3];
+
+ ReplicationActionType replicationActionType = ReplicationActionType.fromName(actionString);
+
+ VoidReplicationPackage replicationPackage = null;
+ if(replicationActionType != null){
+ pathsString = Text.unescape(pathsString);
+ String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
+
+ ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString),
+ replicationActionType, paths);
+ replicationPackage = new VoidReplicationPackage(request, typeString);
}
+
return replicationPackage;
}
@@ -88,6 +94,7 @@ public class VoidReplicationPackage impl
return id.getBytes().length;
}
+
public InputStream createInputStream() throws IOException {
return new ByteArrayInputStream(id.getBytes());
}
@@ -99,4 +106,11 @@ public class VoidReplicationPackage impl
public String getAction() {
return action;
}
+
+ public void close() {
+ }
+
+ public void delete() {
+
+ }
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java Mon Jan 20 17:23:50 2014
@@ -18,6 +18,7 @@
*/
package org.apache.sling.replication.serialization.impl.vlt;
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -80,4 +81,19 @@ public class FileVaultReplicationPackage
return action;
}
+ public void close() {
+ pkg.close();
+ }
+
+ public void delete() {
+ close();
+ try {
+ File file = new File(id);
+ if (file.exists()) {
+ file.delete();
+ }
+ } catch (Exception e) {
+ }
+ }
+
}
Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java Mon Jan 20 17:23:50 2014
@@ -146,9 +146,7 @@ public class FileVaultReplicationPackage
Session session = null;
ReplicationPackage pkg = null;
try {
- if (log.isInfoEnabled()) {
- log.info("reading package for addition");
- }
+ log.info("reading package for addition");
session = getSession();
if (session != null) {
@@ -160,9 +158,7 @@ public class FileVaultReplicationPackage
pkg = new FileVaultReplicationPackage(jcrPackage.getPackage());
}
} catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("could not read / install the package", e);
- }
+ log.error("could not read / install the package", e);
throw new ReplicationPackageReadingException(e);
} finally {
if (session != null) {
@@ -172,7 +168,8 @@ public class FileVaultReplicationPackage
return pkg;
}
- public ReplicationPackage getPackage(String id) {
+ @Override
+ protected ReplicationPackage getPackageInternal(String id) {
ReplicationPackage replicationPackage = null;
try {
File file = new File(id);
@@ -181,15 +178,11 @@ public class FileVaultReplicationPackage
replicationPackage = new FileVaultReplicationPackage(pkg);
}
} catch (Exception e) {
- if (log.isWarnEnabled()) {
- log.info("could not find a package with id : {}", id);
- }
+ log.info("could not find a package with id : {}", id);
}
return replicationPackage;
}
-
-
@Activate
@Modified
protected void activate(ComponentContext ctx) {