You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2014/01/20 18:23:52 UTC

svn commit: r1559782 [1/2] - in /sling/trunk/contrib/extensions/replication/src: main/java/org/apache/sling/replication/agent/ main/java/org/apache/sling/replication/agent/impl/ main/java/org/apache/sling/replication/monitor/ main/java/org/apache/sling...

Author: tommaso
Date: Mon Jan 20 17:23:50 2014
New Revision: 1559782

URL: http://svn.apache.org/r1559782
Log:
SLING-3327 - applied Marius Petria's patch

Modified:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentRootServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationQueueServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentJobConsumerTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/PriorityPathQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategyTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java Mon Jan 20 17:23:50 2014
@@ -72,18 +72,19 @@ public interface ReplicationAgent {
     void send(ReplicationRequest replicationRequest) throws AgentReplicationException;
 
     /**
-     * synchronously process the replication of a certain item skipping the underlying queue(s)
-     *
-     * @param item a {@link ReplicationPackage} to process
-     * @return <code>true</code> if process was successful, <code>false</code> otherwise
-     * @throws AgentReplicationException
-     */
-    boolean process(ReplicationPackage item) throws AgentReplicationException;
-
-    /**
      * get the agent configured endpoint
      *
      * @return an <code>URI</code> specifying its endpoint
      */
     URI getEndpoint();
+
+
+    /**
+     * removes a package from the top of the queue
+     * @param queueName
+     *          the name of a {@link ReplicationQueue} bound tothis agent
+     * @return
+     * @throws ReplicationQueueException
+     */
+    ReplicationPackage removeHead(String queueName) throws ReplicationQueueException;
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Mon Jan 20 17:23:50 2014
@@ -18,8 +18,10 @@
  */
 package org.apache.sling.replication.agent.impl;
 
-import java.util.*;
-
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Random;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -28,14 +30,12 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.apache.sling.replication.agent.AgentConfigurationException;
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
 import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
 import org.apache.sling.replication.queue.impl.SingleQueueDistributionStrategy;
-import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueue;
 import org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider;
 import org.apache.sling.replication.rule.ReplicationRuleEngine;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
@@ -197,7 +197,7 @@ public class ReplicationAgentServiceFact
                         transportHandler, transportAuthenticationProvider, endpoint, packageBuilder, queueProvider, queueDistributionStrategy});
             }
 
-            ReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, useAggregatePaths,
+            SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, useAggregatePaths,
                     transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, queueDistributionStrategy);
 
             // register agent service
@@ -208,14 +208,7 @@ public class ReplicationAgentServiceFact
                 replicationRuleEngine.applyRules(agent, rules);
             }
 
-            // eventually register job consumer for sling job handling based queues
-            if (DEFAULT_QUEUEPROVIDER.equals(queue) && (transportHandler != null && endpoint != null && endpoint.length() > 0)) {
-                Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
-                String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
-                String childTopic = topic + "/*";
-                jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
-                jobReg = context.registerService(JobConsumer.class.getName(), new ReplicationAgentJobConsumer(agent, packageBuilder), jobProps);
-            }
+            queueProvider.enableQueueProcessing(agent, agent);
         }
     }
 
@@ -230,9 +223,7 @@ public class ReplicationAgentServiceFact
                 replicationRuleEngine.unapplyRules(replicationAgent, rules);
             }
 
-            if (jobReg != null) {
-                jobReg.unregister();
-            }
+           queueProvider.disableQueueProcessing(replicationAgent);
 
             if (agentReg != null) {
                 agentReg.unregister();

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Mon Jan 20 17:23:50 2014
@@ -24,14 +24,11 @@ import org.apache.sling.replication.agen
 import org.apache.sling.replication.communication.ReplicationEndpoint;
 import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.communication.ReplicationResponse;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.*;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.transport.ReplicationTransportException;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
@@ -44,7 +41,7 @@ import java.util.List;
 /**
  * Basic implementation of a {@link ReplicationAgent}
  */
-public class SimpleReplicationAgent implements ReplicationAgent {
+public class SimpleReplicationAgent implements ReplicationAgent, ReplicationQueueProcessor {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -152,10 +149,15 @@ public class SimpleReplicationAgent impl
 
     private ReplicationResponse schedule(ReplicationPackage replicationPackage, boolean offer) throws AgentReplicationException {
         ReplicationResponse replicationResponse = new ReplicationResponse();
+        ReplicationQueueItem replicationQueueItem = new ReplicationQueueItem(replicationPackage.getId(),
+                replicationPackage.getPaths(),
+                replicationPackage.getAction(),
+                replicationPackage.getType());
 
         if(offer){
             try {
-                queueDistributionStrategy.offer(replicationPackage, this, queueProvider);
+
+                queueDistributionStrategy.offer(replicationQueueItem, this, queueProvider);
             } catch (ReplicationQueueException e) {
                 replicationResponse.setSuccessful(false);
                 throw new AgentReplicationException(e);
@@ -164,7 +166,7 @@ public class SimpleReplicationAgent impl
         else {
             // send the replication package to the queue distribution handler
             try {
-                ReplicationQueueItemState state = queueDistributionStrategy.add(replicationPackage,
+                ReplicationQueueItemState state = queueDistributionStrategy.add(replicationQueueItem,
                         this, queueProvider);
                 if (state != null) {
                     replicationResponse.setStatus(state.getItemState().toString());
@@ -183,24 +185,45 @@ public class SimpleReplicationAgent impl
         return replicationResponse;
     }
 
-
-    public boolean process(ReplicationPackage item) throws AgentReplicationException {
+    public boolean process(ReplicationQueueItem itemInfo)  {
         try {
-            if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) {
-                transportHandler.transport(item, new ReplicationEndpoint(endpoint),
-                        transportAuthenticationProvider);
-                return true;
-            } else {
-                if (log.isInfoEnabled()) {
+            ReplicationPackage replicationPackage = packageBuilder.getPackage(itemInfo.getId());
+            if(replicationPackage == null){
+                return false;
+            }
+
+            try {
+                if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) {
+                    transportHandler.transport(replicationPackage,
+                            new ReplicationEndpoint(endpoint),
+                            transportAuthenticationProvider);
+
+                    return true;
+                } else {
                     log.info("agent {} processing skipped", name);
+                    return false;
                 }
-                return false;
+            }
+            finally {
+                replicationPackage.delete();
             }
         } catch (ReplicationTransportException e) {
-            throw new AgentReplicationException(e);
+            log.error("transport error", e);
+            return false;
         }
     }
 
+    public ReplicationPackage removeHead(String queueName) throws ReplicationQueueException {
+        ReplicationQueue queue = getQueue(queueName);
+        ReplicationQueueItem info = queue.getHead();
+        if(info == null) return null;
+
+        queue.removeHead();
+
+        ReplicationPackage replicationPackage = packageBuilder.getPackage(info.getId());
+        return replicationPackage;
+    }
+
     public URI getEndpoint() {
         return new ReplicationEndpoint(endpoint).getUri();
     }
@@ -222,4 +245,5 @@ public class SimpleReplicationAgent impl
     public String[] getRules() {
         return rules;
     }
+
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java Mon Jan 20 17:23:50 2014
@@ -38,9 +38,9 @@ import org.apache.sling.hc.api.HealthChe
 import org.apache.sling.hc.api.Result;
 import org.apache.sling.hc.util.FormattingResultLog;
 import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,7 +112,7 @@ public class ReplicationQueueHealthCheck
             for (ReplicationQueueProvider replicationQueueProvider : replicationQueueProviders) {
                 for (ReplicationQueue q : replicationQueueProvider.getAllQueues())
                     try {
-                        ReplicationPackage item = q.getHead();
+                        ReplicationQueueItem item = q.getHead();
                         if (item != null) {
                             ReplicationQueueItemState status = q.getStatus(item);
                             if (status.getAttempts() <= numberOfRetriesAllowed) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java Mon Jan 20 17:23:50 2014
@@ -19,7 +19,6 @@
 package org.apache.sling.replication.queue;
 
 import java.util.Collection;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 
 /**
  * a queue for handling {@link org.apache.sling.replication.agent.ReplicationAgent}s' requests
@@ -36,21 +35,23 @@ public interface ReplicationQueue {
     /**
      * add a replication package to this queue
      *
+     *
      * @param replicationPackage a replication package to replicate
      * @return <code>true</code> if the replication package was added correctly to the queue,
      * <code>false</code otherwise
      * @throws ReplicationQueueException
      */
-    boolean add(ReplicationPackage replicationPackage) throws ReplicationQueueException;
+    boolean add(ReplicationQueueItem replicationPackage) throws ReplicationQueueException;
 
     /**
      * get the status of a certain package in the queue
      *
+     *
      * @param replicationPackage the replication package to get the status for
      * @return the item status in the queue
      * @throws ReplicationQueueException
      */
-    ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
+    ReplicationQueueItemState getStatus(ReplicationQueueItem replicationPackage)
             throws ReplicationQueueException;
 
     /**
@@ -58,7 +59,7 @@ public interface ReplicationQueue {
      *
      * @return the first replication package into the queue
      */
-    ReplicationPackage getHead();
+    ReplicationQueueItem getHead();
 
     /**
      * remove the first package into the queue from it
@@ -79,7 +80,7 @@ public interface ReplicationQueue {
      *
      * @return a <code>Collection</code> of {@link org.apache.sling.replication.serialization.ReplicationPackage}s
      */
-    Collection<ReplicationPackage> getItems();
+    Collection<ReplicationQueueItem> getItems();
 
     /**
      * remove an item from the queue by specifying its id

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -31,26 +31,28 @@ public interface ReplicationQueueDistrib
      * synchronously distribute a {@link ReplicationPackage} to a {@link ReplicationAgent} to a {@link ReplicationQueue}
      * provided by the given {@link ReplicationQueueProvider}
      *
-     * @param replicationPackage a {@link ReplicationPackage} to distribute
-     * @param agent              the {@link ReplicationAgent} to be used for replicating the package
-     * @param queueProvider      the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
+     *
+     * @param replicationPackage a {@link org.apache.sling.replication.serialization.ReplicationPackage} to distribute
+     * @param agent              the {@link org.apache.sling.replication.agent.ReplicationAgent} to be used for replicating the package
+     * @param queueProvider      the {@link org.apache.sling.replication.queue.ReplicationQueueProvider} used to provide the queue to be used for the given package
      * @return a {@link ReplicationQueueItemState} representing the state of the package in the queue after its distribution
      * @throws ReplicationQueueException
      */
-    ReplicationQueueItemState add(ReplicationPackage replicationPackage, ReplicationAgent agent,
+    ReplicationQueueItemState add(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
                                   ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
 
     /**
      * asynchronously distribute a {@link ReplicationPackage} to a {@link ReplicationAgent} to a {@link ReplicationQueue}
      * provided by the given {@link ReplicationQueueProvider}
      *
-     * @param replicationPackage a {@link ReplicationPackage} to distribute
-     * @param agent              the {@link ReplicationAgent} to be used for replicating the package
-     * @param queueProvider      the {@link ReplicationQueueProvider} used to provide the queue to be used for the given package
+     *
+     * @param replicationPackage a {@link org.apache.sling.replication.serialization.ReplicationPackage} to distribute
+     * @param agent              the {@link org.apache.sling.replication.agent.ReplicationAgent} to be used for replicating the package
+     * @param queueProvider      the {@link org.apache.sling.replication.queue.ReplicationQueueProvider} used to provide the queue to be used for the given package
      * @return <code>true</code> if the package could be distributed to a {@link ReplicationQueue}, <code>false</code> otherwise
      * @throws ReplicationQueueException
      */
-    boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+    boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
                   ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -21,7 +21,6 @@ package org.apache.sling.replication.que
 import java.util.Collection;
 
 import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 
 /**
  * A provider for {@link ReplicationQueue}s
@@ -42,19 +41,6 @@ public interface ReplicationQueueProvide
     ReplicationQueue getQueue(ReplicationAgent agent, String name)
                     throws ReplicationQueueException;
 
-    /**
-     * provide the queue to be used for a certain agent and package or creates it if it doesn't
-     * exist
-     * 
-     * @param agent
-     *            the replication agent needing the queue
-     * @param replicationPackage
-     *            the package for which the queue should be used
-     * @return a replication queue to be used for the given parameters
-     * @throws ReplicationQueueException
-     */
-    ReplicationQueue getQueue(ReplicationAgent agent, ReplicationPackage replicationPackage)
-                    throws ReplicationQueueException;
 
     /**
      * get the default queue to be used for a certain agent
@@ -83,4 +69,21 @@ public interface ReplicationQueueProvide
      * @throws ReplicationQueueException
      */
     void removeQueue(ReplicationQueue queue) throws ReplicationQueueException;
+
+    /**
+     * enables queue driven processing for an agent.
+     * @param agent
+     *          a replication agent
+     * @param queueProcessor
+     *          the callback that is called when an item needs processing
+     */
+    void enableQueueProcessing(ReplicationAgent agent, ReplicationQueueProcessor queueProcessor);
+
+
+    /**
+     * disables queue driven processing for an agent
+     * @param agent
+     *          a replication agent
+     */
+    void disableQueueProcessing(ReplicationAgent agent);
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -40,10 +40,6 @@ public abstract class AbstractReplicatio
 
     private final Map<String, ReplicationQueue> queueMap = new HashMap<String, ReplicationQueue>();
 
-    public ReplicationQueue getQueue(ReplicationAgent agent,
-                                     ReplicationPackage replicationPackage) throws ReplicationQueueException {
-        return getQueue(agent, replicationPackage.getAction());
-    }
 
     public ReplicationQueue getQueue(ReplicationAgent agent, String queueName)
                     throws ReplicationQueueException {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -25,13 +25,9 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.*;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +72,7 @@ public class ErrorAwareQueueDistribution
         timeThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(TIME_THRESHOLD), 10800000);
     }
 
-    public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+    public ReplicationQueueItemState add(ReplicationQueueItem replicationPackage,
                                          ReplicationAgent agent, ReplicationQueueProvider queueProvider)
             throws ReplicationQueueException {
         try {
@@ -111,7 +107,7 @@ public class ErrorAwareQueueDistribution
         }
     }
 
-    public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+    public boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
                          ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         boolean added;
         ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
@@ -129,7 +125,7 @@ public class ErrorAwareQueueDistribution
                                           ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         ReplicationQueue defaultQueue = queueProvider.getDefaultQueue(agent);
         // get first item in the queue with its status
-        ReplicationPackage firstItem = defaultQueue.getHead();
+        ReplicationQueueItem firstItem = defaultQueue.getHead();
         if (firstItem != null) {
             ReplicationQueueItemState status = defaultQueue.getStatus(firstItem);
             // if item is still in the queue after a max no. of attempts, move it to the error queue

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -25,18 +25,14 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.queue.*;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 
 /**
  * Distribution algorithm which keeps one specific queue to handle specific paths and another queue
@@ -61,7 +57,7 @@ public class PriorityPathDistributionStr
         priorityPaths = PropertiesUtil.toStringArray(context.getProperties().get(PRIORITYPATHS));
     }
 
-    public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+    public ReplicationQueueItemState add(ReplicationQueueItem replicationPackage,
                     ReplicationAgent agent, ReplicationQueueProvider queueProvider)
                     throws ReplicationQueueException {
         if (log.isInfoEnabled()) {
@@ -95,7 +91,7 @@ public class PriorityPathDistributionStr
 
     }
 
-    private ReplicationQueue getQueue(ReplicationPackage replicationPackage,
+    private ReplicationQueue getQueue(ReplicationQueueItem replicationPackage,
                     ReplicationAgent agent, ReplicationQueueProvider queueProvider)
                     throws ReplicationQueueException {
         String[] paths = replicationPackage.getPaths();
@@ -131,7 +127,7 @@ public class PriorityPathDistributionStr
         return queue;
     }
 
-    public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+    public boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
                          ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         ReplicationQueue queue = getQueue(replicationPackage, agent, queueProvider);
         if (queue != null) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Mon Jan 20 17:23:50 2014
@@ -21,17 +21,13 @@ package org.apache.sling.replication.que
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.queue.*;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.queue.ReplicationQueue;
-import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
-import org.apache.sling.replication.queue.ReplicationQueueException;
-import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 
 /**
  * The default strategy for delivering packages to queues. Each agent just manages a single queue,
@@ -46,7 +42,7 @@ public class SingleQueueDistributionStra
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+    public ReplicationQueueItemState add(ReplicationQueueItem replicationPackage,
                     ReplicationAgent agent, ReplicationQueueProvider queueProvider)
                     throws ReplicationQueueException {
         if (log.isInfoEnabled()) {
@@ -78,7 +74,7 @@ public class SingleQueueDistributionStra
 
     }
 
-    public boolean offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+    public boolean offer(ReplicationQueueItem replicationPackage, ReplicationAgent agent,
                          ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
         ReplicationQueue queue = queueProvider.getDefaultQueue(agent);
         if (queue != null) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java Mon Jan 20 17:23:50 2014
@@ -30,9 +30,9 @@ import org.apache.sling.event.jobs.JobMa
 import org.apache.sling.event.jobs.JobManager.QueryType;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +61,7 @@ public class JobHandlingReplicationQueue
         return name;
     }
 
-    public boolean add(ReplicationPackage replicationPackage) {
+    public boolean add(ReplicationQueueItem replicationPackage) {
         boolean result = true;
         try {
             Map<String, Object> properties = JobHandlingUtils
@@ -80,7 +80,7 @@ public class JobHandlingReplicationQueue
         return result;
     }
 
-    public ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
+    public ReplicationQueueItemState getStatus(ReplicationQueueItem replicationPackage)
             throws ReplicationQueueException {
         ReplicationQueueItemState itemStatus = new ReplicationQueueItemState();
         try {
@@ -103,7 +103,7 @@ public class JobHandlingReplicationQueue
         return itemStatus;
     }
 
-    public ReplicationPackage getHead() {
+    public ReplicationQueueItem getHead() {
         Job firstItem = getFirstItem();
         if (firstItem != null) {
             return JobHandlingUtils.getPackage(firstItem);
@@ -149,8 +149,8 @@ public class JobHandlingReplicationQueue
         return getItems().isEmpty();
     }
 
-    public Collection<ReplicationPackage> getItems() {
-        Collection<ReplicationPackage> items = new LinkedList<ReplicationPackage>();
+    public Collection<ReplicationQueueItem> getItems() {
+        Collection<ReplicationQueueItem> items = new LinkedList<ReplicationQueueItem>();
         Collection<Job> jobs = jobManager.findJobs(QueryType.ALL, topic, -1);
         for (Job job : jobs) {
             items.add(JobHandlingUtils.getPackage(job));

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -21,15 +21,18 @@ package org.apache.sling.replication.que
 import java.io.IOException;
 import java.util.Dictionary;
 import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.*;
 import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
 
@@ -53,6 +56,10 @@ public class JobHandlingReplicationQueue
     @Reference
     private ConfigurationAdmin configAdmin;
 
+    private Map<String, ServiceRegistration> jobs = new ConcurrentHashMap<String, ServiceRegistration>();
+    private BundleContext context;
+
+
     @Override
     protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String queueName)
                     throws ReplicationQueueException {
@@ -84,4 +91,37 @@ public class JobHandlingReplicationQueue
         q.removeAll();
     }
 
+    public void enableQueueProcessing(ReplicationAgent agent, ReplicationQueueProcessor queueProcessor) {
+        // TODO: make this configurable (whether to create self processing queues or not)
+        if (agent.getEndpoint() == null || agent.getEndpoint().toString().length() == 0) return;
+
+        // eventually register job consumer for sling job handling based queues
+        Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
+        String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agent.getName();
+        String childTopic = topic + "/*";
+        jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
+        ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
+                new ReplicationAgentJobConsumer(agent, queueProcessor), jobProps);
+        jobs.put(agent.getName(), jobReg);
+    }
+
+
+    public void disableQueueProcessing(ReplicationAgent agent) {
+        ServiceRegistration jobReg = jobs.remove(agent.getName());
+        if (jobReg != null) {
+            jobReg.unregister();
+        }
+    }
+
+    @Activate
+    private void activate(BundleContext context){
+        this.context = context;
+    }
+
+    @Deactivate
+    private void deactivate(BundleContext context){
+        for (ServiceRegistration jobReg : jobs.values()){
+            jobReg.unregister();
+        }
+    }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Mon Jan 20 17:23:50 2014
@@ -19,15 +19,13 @@
 package org.apache.sling.replication.queue.impl.jobhandling;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.io.IOUtils;
+
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.ScheduledJobInfo;
-import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,115 +35,32 @@ public class JobHandlingUtils {
 
     private static final String PATHS = "replication.package.paths";
 
-    private static final String ID = "replication.package.id";
-
-    private static final String LENGTH = "replication.package.length";
-
-    private static final String BIN = "replication.package.stream";
+    public static final String ID = "replication.package.id";
 
     private static final String TYPE = "replication.package.type";
 
     protected static final String ACTION = "replication.package.action";
 
-    public static ReplicationPackage getPackage(ScheduledJobInfo info, JobManager jobManager,
-                    String topic) {
-        Job job = getJob(info, jobManager, topic);
-        return JobHandlingUtils.getPackage(job);
-    }
-
-    public static Job getJob(ScheduledJobInfo info, JobManager jobManager, String topic) {
-        String id = String.valueOf(info.getJobProperties().get(ID));
-        Map<String, Object> jobProps = JobHandlingUtils.createIdPropertiesFromId(id);
-        return jobManager.getJob(topic, jobProps);
-    }
-
-    public static ReplicationPackage getPackage(ReplicationPackageBuilder packageBuilder,
-                    final Job job) {
-        ReplicationPackage pkg = null;
-        String id = String.valueOf(job.getProperty(ID));
-        try {
-            pkg = packageBuilder.getPackage(id);
-            if (pkg != null) {
-                if (log.isInfoEnabled()) {
-                    log.info("successfully retrieved a package with id {}", id);
-                }
-            }
-        } catch (Exception e) {
-            if (log.isWarnEnabled()) {
-                log.warn("failed retrieving a package with id {}", id);
-            }
-        }
-        if (pkg == null) {
-            try {
-                pkg = getPackage(job);
-                if (pkg != null) {
-                    if (log.isInfoEnabled()) {
-                        log.info("successfully deserialized a package from job {}", job);
-                    }
-                }
-            } catch (Exception e) {
-                if (log.isWarnEnabled()) {
-                    log.warn("failed deserializing package from job {}", job, e);
-                }
-            }
-        }
-        if (pkg == null) {
-            if (log.isErrorEnabled()) {
-                log.error("could not find a package from job {}", job);
-            }
-        }
-        return pkg;
-    }
-
-    public static ReplicationPackage getPackage(final Job job) {
-        return new ReplicationPackage() {
-
-            private static final long serialVersionUID = 1L;
-
-            public String[] getPaths() {
-                return (String[]) job.getProperty(PATHS);
-            }
-
-            public long getLength() {
-                return (Long) job.getProperty(LENGTH);
-            }
-
-            public InputStream createInputStream() throws IOException {
-                return IOUtils.toInputStream(String.valueOf(job.getProperty(BIN)));
-
-                // workaround to make void package work while we get SLING-3140 to be released
-//                return IOUtils.toInputStream(String.valueOf(job.getProperty(ID)));
-            }
-
-            public String getId() {
-                return String.valueOf(job.getProperty(ID));
-            }
-
-            public String getType() {
-                return String.valueOf(job.getProperty(TYPE));
-            }
-
-            public String getAction() {
-                return String.valueOf(job.getProperty(ACTION));
-            }
-        };
 
+    public static ReplicationQueueItem getPackage(final Job job) {
+        return new ReplicationQueueItem(String.valueOf(job.getProperty(ID)),
+                (String[]) job.getProperty(PATHS),
+                String.valueOf(job.getProperty(ACTION)),
+                String.valueOf(job.getProperty(TYPE)));
     }
 
     public static Map<String, Object> createFullPropertiesFromPackage(
-                    ReplicationPackage replicationPackage) throws IOException {
+                    ReplicationQueueItem replicationPackage) throws IOException {
         Map<String, Object> properties = new HashMap<String, Object>();
         properties.put(ID, replicationPackage.getId());
         properties.put(PATHS, replicationPackage.getPaths());
-        properties.put(LENGTH, replicationPackage.getLength());
         properties.put(ACTION, replicationPackage.getAction());
-        properties.put(BIN, IOUtils.toString(replicationPackage.createInputStream()));
         properties.put(TYPE, replicationPackage.getType());
         return properties;
     }
 
     public static Map<String, Object> createIdPropertiesFromPackage(
-                    ReplicationPackage replicationPackage) {
+                    ReplicationQueueItem replicationPackage) {
         Map<String, Object> properties = new HashMap<String, Object>();
         properties.put(ID, replicationPackage.getId());
         return properties;

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java Mon Jan 20 17:23:50 2014
@@ -19,68 +19,51 @@
 package org.apache.sling.replication.queue.impl.simple;
 
 import java.util.Arrays;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Properties;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferencePolicy;
-import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.replication.agent.AgentReplicationException;
+
 import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * a simple scheduled {@link SimpleReplicationQueue}s processor
  */
-@Component(
-    label = "In memory Replication Queues processor", 
-    description = "Service that trigger processing of elements in memory resident replication queues.",
-    metatype = true)
-@Service(value = Runnable.class)
-@Properties({
-        @Property(name = "scheduler.period", longValue = 10, label = "Frequency", description = "Processing frequency in seconds"),
-        @Property(name = "scheduler.concurrent", boolValue = false, propertyPrivate = true) })
 public class ScheduledReplicationQueueProcessor implements Runnable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
+    private final ReplicationQueueProvider queueProvider;
+    private final ReplicationQueueProcessor queueProcessor;
+
+    public ScheduledReplicationQueueProcessor(ReplicationQueueProvider queueProvider,
+                                              ReplicationQueueProcessor queueProcessor){
+
+        this.queueProvider = queueProvider;
+        this.queueProcessor = queueProcessor;
+    }
 
-    @Reference(name = "ReplicationQueueProvider", target = "(name="
-                    + SimpleReplicationQueueProvider.NAME + ")", policy = ReferencePolicy.DYNAMIC)
-    private ReplicationQueueProvider replicationQueueProvider;
 
     public void run() {
         try {
-            for (ReplicationQueue queue : replicationQueueProvider.getAllQueues()) {
+            for (ReplicationQueue queue : queueProvider.getAllQueues()) {
                 while (!queue.isEmpty()) {
                     // synchronized (queue) {
-                    ReplicationPackage item = queue.getHead();
+                    ReplicationQueueItem item = queue.getHead();
                     if (item != null) {
-                        try {
-                            if (((SimpleReplicationQueue)queue).getAgent().process(item)) {
-                                queue.removeHead();
-                            } else {
-                                if (log.isWarnEnabled()) {
-                                    log.warn("processing of item {} failed",
-                                                    Arrays.toString(item.getPaths()));
-                                }
-                            }
-                        } catch (AgentReplicationException e) {
-                            if (log.isErrorEnabled()) {
-                                log.error("an error happened while processing an item {}",
-                                                Arrays.toString(item.getPaths()));
-                            }
+                        if (queueProcessor.process(item)) {
+                            queue.removeHead();
+                        } else {
+                            log.warn("processing of item {} failed",
+                                    Arrays.toString(item.getPaths()));
+
                         }
                     }
                     // }
                 }
             }
         } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("error while processing queue {}", e);
-            }
+            log.error("error while processing queue {}", e);
         }
 
     }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java Mon Jan 20 17:23:50 2014
@@ -27,9 +27,9 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.TimeUnit;
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
-import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,9 +47,9 @@ public class SimpleReplicationQueue impl
 
     private final String name;
 
-    private final BlockingQueue<ReplicationPackage> queue;
+    private final BlockingQueue<ReplicationQueueItem> queue;
 
-    private final Map<ReplicationPackage, ReplicationQueueItemState> statusMap;
+    private final Map<ReplicationQueueItem, ReplicationQueueItemState> statusMap;
 
     public SimpleReplicationQueue(ReplicationAgent agent, String name) {
         if (log.isInfoEnabled()) {
@@ -57,15 +57,15 @@ public class SimpleReplicationQueue impl
         }
         this.agent = agent;
         this.name = name;
-        this.queue = new LinkedBlockingQueue<ReplicationPackage>();
-        this.statusMap = new WeakHashMap<ReplicationPackage, ReplicationQueueItemState>(10);
+        this.queue = new LinkedBlockingQueue<ReplicationQueueItem>();
+        this.statusMap = new WeakHashMap<ReplicationQueueItem, ReplicationQueueItemState>(10);
     }
 
     public String getName() {
         return name;
     }
 
-    public boolean add(ReplicationPackage replicationPackage) {
+    public boolean add(ReplicationQueueItem replicationPackage) {
         ReplicationQueueItemState status = new ReplicationQueueItemState();
         boolean result = false;
         try {
@@ -80,7 +80,7 @@ public class SimpleReplicationQueue impl
         return result;
     }
 
-    public ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage) {
+    public ReplicationQueueItemState getStatus(ReplicationQueueItem replicationPackage) {
         ReplicationQueueItemState status = statusMap.get(replicationPackage);
         if (queue.contains(replicationPackage)) {
             status.setItemState(ItemState.QUEUED);
@@ -94,8 +94,8 @@ public class SimpleReplicationQueue impl
         return agent;
     }
 
-    public ReplicationPackage getHead() {
-        ReplicationPackage element = queue.peek();
+    public ReplicationQueueItem getHead() {
+        ReplicationQueueItem element = queue.peek();
         if (element != null) {
             ReplicationQueueItemState replicationQueueItemStatus = statusMap.get(element);
             replicationQueueItemStatus.setAttempts(replicationQueueItemStatus.getAttempts() + 1);
@@ -104,7 +104,7 @@ public class SimpleReplicationQueue impl
     }
 
     public void removeHead() {
-        ReplicationPackage element = queue.remove();
+        ReplicationQueueItem element = queue.remove();
         statusMap.get(element).setSuccessful(true);
     }
 
@@ -112,13 +112,13 @@ public class SimpleReplicationQueue impl
         return queue.isEmpty();
     }
 
-    public Collection<ReplicationPackage> getItems() {
+    public Collection<ReplicationQueueItem> getItems() {
         return queue;
     }
 
     public void remove(String id) {
-        ReplicationPackage toRemove = null;
-        for (ReplicationPackage item : queue) {
+        ReplicationQueueItem toRemove = null;
+        for (ReplicationQueueItem item : queue) {
             if (id.equals(item.getId())) {
                 toRemove = item;
             }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java Mon Jan 20 17:23:50 2014
@@ -20,9 +20,13 @@ package org.apache.sling.replication.que
 
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
 
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueException;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
@@ -38,6 +42,9 @@ import org.apache.sling.replication.queu
 public class SimpleReplicationQueueProvider extends AbstractReplicationQueueProvider implements
                 ReplicationQueueProvider {
 
+    @Reference
+    Scheduler scheduler;
+
     public static final String NAME = "simple";
 
     protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String selector)
@@ -49,4 +56,18 @@ public class SimpleReplicationQueueProvi
         // do nothing as queues just exist in the cache
     }
 
+    public void enableQueueProcessing(ReplicationAgent agent, ReplicationQueueProcessor queueProcessor) {
+        ScheduleOptions options = scheduler.NOW(-1, 10)
+                .canRunConcurrently(false)
+                .name(getJobName(agent));
+        scheduler.schedule(new ScheduledReplicationQueueProcessor(this, queueProcessor), options);
+    }
+
+    public void disableQueueProcessing(ReplicationAgent agent) {
+        scheduler.unschedule(getJobName(agent));
+    }
+
+    private String getJobName(ReplicationAgent agent){
+        return SimpleReplicationQueueProvider.NAME+"-queueProcessor-"+agent.getName();
+    }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java Mon Jan 20 17:23:50 2014
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.replication.serialization;
 
+import org.apache.sling.replication.queue.ReplicationQueueItem;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
@@ -65,4 +67,15 @@ public interface ReplicationPackage exte
      */
     long getLength();
 
+
+    /**
+     * releases resources associated with this object
+     */
+    void close();
+
+    /**
+     * releases all resources associated with the package id
+     */
+    void delete();
+
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageBuilder.java Mon Jan 20 17:23:50 2014
@@ -55,4 +55,5 @@ public interface ReplicationPackageBuild
      * @return a {@link ReplicationPackage} if one with such an id exists, <code>null</code> otherwise
      */
     ReplicationPackage getPackage(String id);
+
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java Mon Jan 20 17:23:50 2014
@@ -34,11 +34,4 @@ public interface ReplicationPackageImpor
      */
     boolean importStream(InputStream stream, String type);
 
-    /**
-     * Asynchronously schedules the import of the {@link org.apache.sling.replication.serialization.ReplicationPackage}
-     *
-     * @param stream the <code>InputStream</code> of the given <code>ReplicationPackage</code>
-     * @param type   the <code>String</code> representing the ({@link ReplicationPackage#getType() type} of the given package
-     */
-    void scheduleImport(InputStream stream, String type) throws Exception;
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java Mon Jan 20 17:23:50 2014
@@ -19,6 +19,8 @@
 package org.apache.sling.replication.serialization.impl;
 
 import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
@@ -76,9 +78,7 @@ public abstract class AbstractReplicatio
                 replicationPackage = readPackageForDelete(stream);
             }
         } catch (Exception e) {
-            if (log.isWarnEnabled()) {
-                log.warn("{}", e);
-            }
+            log.warn("cannot parse stream", e);
         }
         stream.mark(-1);
         if (replicationPackage == null) {
@@ -91,22 +91,17 @@ public abstract class AbstractReplicatio
         ReplicationPackage replicationPackage = null;
         Session session = null;
         try {
-            VoidReplicationPackage voidReplicationPackage = VoidReplicationPackage.fromStream(stream);
-            if (voidReplicationPackage != null) {
+            replicationPackage = VoidReplicationPackage.fromStream(stream);
+
+            if(replicationPackage != null){
                 session = getSession();
-                if (session != null) {
-                    for (String path : voidReplicationPackage.getPaths()) {
-                        if (session.itemExists(path)) {
-                            session.removeItem(path);
-                        }
+                for (String path : replicationPackage.getPaths()) {
+                    if (session.itemExists(path)) {
+                        session.removeItem(path);
                     }
-                    session.save();
-                    ReplicationRequest request = new ReplicationRequest(System.currentTimeMillis(),
-                            ReplicationActionType.DELETE, voidReplicationPackage.getPaths());
-                    replicationPackage = new VoidReplicationPackage(request, getName());
                 }
+                session.save();
             }
-            return replicationPackage;
         } catch (Exception e) {
             throw new ReplicationPackageReadingException(e);
         } finally {
@@ -115,6 +110,20 @@ public abstract class AbstractReplicatio
             }
         }
 
+        return replicationPackage;
+    }
+
+    public ReplicationPackage getPackage(String id) {
+        ReplicationPackage replicationPackage = null;
+        try {
+            replicationPackage = VoidReplicationPackage.fromStream(new ByteArrayInputStream(id.getBytes()));
+        }
+        catch (IOException ex){
+        }
+
+        if(replicationPackage != null) return replicationPackage;
+
+        return getPackageInternal(id);
     }
 
     protected abstract String getName();
@@ -124,4 +133,7 @@ public abstract class AbstractReplicatio
     protected abstract ReplicationPackage readPackageForAdd(InputStream stream, boolean install)
             throws ReplicationPackageReadingException;
 
+
+    protected abstract ReplicationPackage getPackageInternal(String id);
+
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageBuilderProvider.java Mon Jan 20 17:23:50 2014
@@ -29,6 +29,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.References;
 import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,8 +64,8 @@ public class DefaultReplicationPackageBu
                     final ReplicationPackageBuilder replicationPackageBuilder,
                     Map<String, Object> properties) {
         synchronized (replicationPackageBuilders) {
-            replicationPackageBuilders.put(String.valueOf(properties.get("name")),
-                            replicationPackageBuilder);
+            String name =  PropertiesUtil.toString(properties.get("name"), "");
+            replicationPackageBuilders.put(name, replicationPackageBuilder);
         }
         log.debug("Registering Replication Package Builder {} ", replicationPackageBuilder);
     }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java Mon Jan 20 17:23:50 2014
@@ -19,34 +19,21 @@
 package org.apache.sling.replication.serialization.impl;
 
 import java.io.BufferedInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Dictionary;
-import java.util.HashMap;
 import java.util.Hashtable;
-import java.util.Map;
-import org.apache.commons.io.IOUtils;
-import org.apache.felix.scr.annotations.Activate;
+
 import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
-import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.QueueConfiguration;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.apache.sling.replication.event.ReplicationEventFactory;
 import org.apache.sling.replication.event.ReplicationEventType;
-import org.apache.sling.replication.queue.ReplicationQueueException;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
 import org.apache.sling.replication.serialization.ReplicationPackageImporter;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,41 +62,6 @@ public class DefaultReplicationPackageIm
     @Reference
     private ConfigurationAdmin configAdmin;
 
-    private ServiceRegistration jobReg;
-
-
-    @Activate
-    protected void activate(BundleContext context) throws Exception {
-        try {
-            if (jobManager.getQueue(QUEUE_NAME) == null) {
-                Configuration config = configAdmin.createFactoryConfiguration(
-                        QueueConfiguration.class.getName(), null);
-                Dictionary<String, Object> props = new Hashtable<String, Object>();
-                props.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
-                props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
-                props.put(ConfigurationConstants.PROP_TOPICS, new String[]{QUEUE_TOPIC, QUEUE_TOPIC + "/*"});
-                props.put(ConfigurationConstants.PROP_RETRIES, -1);
-                props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
-                props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
-                props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
-                config.update(props);
-            }
-        } catch (IOException e) {
-            throw new ReplicationQueueException("could not create an import queue", e);
-        }
-
-        Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
-        jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{QUEUE_TOPIC});
-        jobReg = context.registerService(JobConsumer.class.getName(), new ReplicationPackageImporterJobConsumer(), jobProps);
-    }
-
-    @Deactivate
-    public void deactivate() {
-        if (jobReg != null) {
-            jobReg.unregister();
-        }
-    }
-
     public boolean importStream(InputStream stream, String type) {
         boolean success = false;
         try {
@@ -119,9 +71,7 @@ public class DefaultReplicationPackageIm
                 if (replicationPackageBuilder != null) {
                     replicationPackage = replicationPackageBuilder.readPackage(stream, true);
                 } else {
-                    if (log.isWarnEnabled()) {
-                        log.warn("cannot read streams of type {}", type);
-                    }
+                    log.warn("cannot read streams of type {}", type);
                 }
             } else {
                 BufferedInputStream bufferedInputStream = new BufferedInputStream(stream); // needed to allow for multiple reads
@@ -129,69 +79,29 @@ public class DefaultReplicationPackageIm
                     try {
                         replicationPackage = replicationPackageBuilder.readPackage(bufferedInputStream, true);
                     } catch (Exception e) {
-                        if (log.isWarnEnabled()) {
-                            log.warn("received stream cannot be read with {}", replicationPackageBuilder);
-                        }
+                        log.warn("received stream cannot be read with {}", replicationPackageBuilder);
                     }
                 }
             }
 
             if (replicationPackage != null) {
-                if (log.isInfoEnabled()) {
-                    log.info("replication package read and installed for path(s) {}",
-                            Arrays.toString(replicationPackage.getPaths()));
-                }
+                log.info("replication package read and installed for path(s) {}",
+                        Arrays.toString(replicationPackage.getPaths()));
 
                 Dictionary<String, Object> dictionary = new Hashtable<String, Object>();
                 dictionary.put("replication.action", replicationPackage.getAction());
                 dictionary.put("replication.path", replicationPackage.getPaths());
                 replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_INSTALLED, dictionary);
                 success = true;
+
+                replicationPackage.delete();
             } else {
-                if (log.isWarnEnabled()) {
-                    log.warn("could not read a replication package");
-                }
+                log.warn("could not read a replication package");
             }
         } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("cannot import a package from the given stream of type {}", type);
-            }
+            log.error("cannot import a package from the given stream of type {}", type);
         }
         return success;
     }
 
-    public void scheduleImport(InputStream stream, String type) throws Exception {
-        try {
-            Map<String, Object> properties = new HashMap<String, Object>();
-            properties.put("bin", IOUtils.toString(stream)); // TODO : compress / encode the stream
-            properties.put("type", type);
-            Job job = jobManager.createJob(QUEUE_TOPIC).properties(properties).add();
-            if (log.isInfoEnabled()) {
-                log.info("job added {}", job);
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("could not add an item to the queue");
-            }
-            throw e;
-        }
-    }
-
-    private class ReplicationPackageImporterJobConsumer implements JobConsumer {
-
-        public JobResult process(Job job) {
-            try {
-                InputStream stream = IOUtils.toInputStream(String.valueOf(job.getProperty("bin"))); // TODO : decompress / decode the stream string
-                boolean result = importStream(stream, String.valueOf(job.getProperty("type")));
-                return result ? JobResult.OK : JobResult.FAILED;
-            } catch (Exception e) {
-                if (log.isErrorEnabled()) {
-                    log.error("could not process import job correctly", e);
-                }
-                return JobResult.FAILED;
-            }
-        }
-    }
-
-
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java Mon Jan 20 17:23:50 2014
@@ -26,6 +26,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.vault.util.Text;
 import org.apache.sling.replication.communication.ReplicationActionType;
 import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 
 /**
@@ -45,31 +46,36 @@ public class VoidReplicationPackage impl
         this.type = type;
         this.paths = request.getPaths();
         this.action = request.getAction().toString();
-        this.id = request.getAction().toString() + ':' + Arrays.toString(request.getPaths()) + ':' + request.getTime();
+        this.id = request.getAction().toString()
+                + ':' + Arrays.toString(request.getPaths())
+                + ':' + request.getTime()
+                + ':' + type;
     }
 
     public static VoidReplicationPackage fromStream(InputStream stream) throws IOException {
-        VoidReplicationPackage replicationPackage = null;
         String streamString = IOUtils.toString(stream);
 
-        int beginIndex = streamString.indexOf(':');
-        int endIndex = streamString.lastIndexOf(':');
-        if (beginIndex >= 0 && endIndex > beginIndex){
-            String actionString = streamString.substring(0, beginIndex);
-            String pathsString = streamString.substring(beginIndex+1, endIndex);
-            String timeString =  streamString.substring(endIndex + 1);
-
-            ReplicationActionType replicationActionType = ReplicationActionType.fromName(actionString);
-
-            if(replicationActionType != null){
-                pathsString = Text.unescape(pathsString);
-                String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
-
-                ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString),
-                        replicationActionType, paths);
-                replicationPackage = new VoidReplicationPackage(request, "VOID");
-            }
+        String[] parts = streamString.split(":");
+
+        if(parts == null || parts.length < 4) return null;
+
+        String actionString = parts[0];
+        String pathsString = parts[1];
+        String timeString = parts[2];
+        String typeString = parts[3];
+
+        ReplicationActionType replicationActionType = ReplicationActionType.fromName(actionString);
+
+        VoidReplicationPackage replicationPackage = null;
+        if(replicationActionType != null){
+            pathsString = Text.unescape(pathsString);
+            String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
+
+            ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString),
+                    replicationActionType, paths);
+            replicationPackage = new VoidReplicationPackage(request, typeString);
         }
+
         return replicationPackage;
     }
 
@@ -88,6 +94,7 @@ public class VoidReplicationPackage impl
         return id.getBytes().length;
     }
 
+
     public InputStream createInputStream() throws IOException {
         return new ByteArrayInputStream(id.getBytes());
     }
@@ -99,4 +106,11 @@ public class VoidReplicationPackage impl
     public String getAction() {
         return action;
     }
+
+    public void close() {
+    }
+
+    public void delete() {
+
+    }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java Mon Jan 20 17:23:50 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.replication.serialization.impl.vlt;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -80,4 +81,19 @@ public class FileVaultReplicationPackage
         return action;
     }
 
+    public void close() {
+        pkg.close();
+    }
+
+    public void delete() {
+        close();
+        try {
+            File file = new File(id);
+            if (file.exists()) {
+                file.delete();
+            }
+        } catch (Exception e) {
+        }
+    }
+
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java?rev=1559782&r1=1559781&r2=1559782&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackageBuilder.java Mon Jan 20 17:23:50 2014
@@ -146,9 +146,7 @@ public class FileVaultReplicationPackage
         Session session = null;
         ReplicationPackage pkg = null;
         try {
-            if (log.isInfoEnabled()) {
-                log.info("reading package for addition");
-            }
+            log.info("reading package for addition");
 
             session = getSession();
             if (session != null) {
@@ -160,9 +158,7 @@ public class FileVaultReplicationPackage
                 pkg = new FileVaultReplicationPackage(jcrPackage.getPackage());
             }
         } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("could not read / install the package", e);
-            }
+            log.error("could not read / install the package", e);
             throw new ReplicationPackageReadingException(e);
         } finally {
             if (session != null) {
@@ -172,7 +168,8 @@ public class FileVaultReplicationPackage
         return pkg;
     }
 
-    public ReplicationPackage getPackage(String id) {
+    @Override
+    protected ReplicationPackage getPackageInternal(String id) {
         ReplicationPackage replicationPackage = null;
         try {
             File file = new File(id);
@@ -181,15 +178,11 @@ public class FileVaultReplicationPackage
                 replicationPackage = new FileVaultReplicationPackage(pkg);
             }
         } catch (Exception e) {
-            if (log.isWarnEnabled()) {
-                log.info("could not find a package with id : {}", id);
-            }
+            log.info("could not find a package with id : {}", id);
         }
         return replicationPackage;
     }
 
-
-
     @Activate
     @Modified
     protected void activate(ComponentContext ctx) {