You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:57:25 UTC

[sling-org-apache-sling-pipes] 09/19: SLING-6623 allow async execution of pipes

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.pipes-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-pipes.git

commit 4c2f290b95b8b316b36c0fcc571d8e2dc772ff21
Author: Robert Munteanu <ro...@apache.org>
AuthorDate: Wed May 24 07:22:22 2017 +0000

    SLING-6623 allow async execution of pipes
    
    - moved execution code to a central place, under plumber implementation,
    - added status, and bufferization of the commits (async execution means potentially long executions),
    - added job registering & processing capabilities for plumber, based on a configurable service user (will update documentation accordingly)
    
    This closes #229
    
    Submitted-By: Nicolas Peltier
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/extensions/sling-pipes@1796004 13f79535-47bb-0310-9956-ffa450edef68
---
 pom.xml                                            |   7 +-
 src/main/java/org/apache/sling/pipes/BasePipe.java |  10 +
 .../java/org/apache/sling/pipes/OutputWriter.java  |  60 +++++-
 src/main/java/org/apache/sling/pipes/Pipe.java     |   7 +
 src/main/java/org/apache/sling/pipes/Plumber.java  |  38 ++--
 .../sling/pipes/internal/DefaultOutputWriter.java  |   9 +-
 .../{DefaultOutputWriter.java => NopWriter.java}   |  36 +---
 .../apache/sling/pipes/internal/PlumberImpl.java   | 208 ++++++++++++++++-----
 .../sling/pipes/internal/PlumberServlet.java       | 112 +++++------
 .../java/org/apache/sling/pipes/package-info.java  |   2 +-
 .../org/apache/sling/pipes/AbstractPipeTest.java   |   8 +-
 .../sling/pipes/internal/PlumberServletTest.java   |   2 +-
 12 files changed, 338 insertions(+), 161 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1ba63ae..77fb030 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
   <artifactId>org.apache.sling.pipes</artifactId>
   <packaging>bundle</packaging>
-  <version>0.0.11-SNAPSHOT</version>
+  <version>1.0.0-SNAPSHOT</version>
 
   <name>Apache Sling Pipes</name>
   <description>bulk content changes tool</description>
@@ -137,6 +137,11 @@
       <version>2.0.6</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.sling</groupId>
+      <artifactId>org.apache.sling.event.api</artifactId>
+      <version>1.0.0</version>
+    </dependency>
     <!-- testing -->
     <dependency>
       <groupId>junit</groupId>
diff --git a/src/main/java/org/apache/sling/pipes/BasePipe.java b/src/main/java/org/apache/sling/pipes/BasePipe.java
index afaad61..b644b13 100644
--- a/src/main/java/org/apache/sling/pipes/BasePipe.java
+++ b/src/main/java/org/apache/sling/pipes/BasePipe.java
@@ -37,6 +37,11 @@ public class BasePipe implements Pipe {
 
     public static final String RESOURCE_TYPE = "slingPipes/base";
     public static final String DRYRUN_KEY = "dryRun";
+    public static final String READ_ONLY = "readOnly";
+    public static final String PN_STATUS = "status";
+    public static final String PN_STATUS_MODIFIED = "statusModified";
+    public static final String STATUS_STARTED = "started";
+    public static final String STATUS_FINISHED = "finished";
     protected static final String DRYRUN_EXPR = "${" + DRYRUN_KEY + "}";
 
     protected ResourceResolver resolver;
@@ -63,6 +68,11 @@ public class BasePipe implements Pipe {
         this.parent = parent;
     }
 
+    @Override
+    public Resource getResource() {
+        return resource;
+    }
+
     protected Plumber plumber;
 
     private String name;
diff --git a/src/main/java/org/apache/sling/pipes/OutputWriter.java b/src/main/java/org/apache/sling/pipes/OutputWriter.java
index 3825b2a..13e1bc7 100644
--- a/src/main/java/org/apache/sling/pipes/OutputWriter.java
+++ b/src/main/java/org/apache/sling/pipes/OutputWriter.java
@@ -26,41 +26,83 @@ import java.io.IOException;
 /**
  * defines how pipe's output get written to a servlet response
  */
-public interface OutputWriter {
+public abstract class OutputWriter {
 
-    String KEY_SIZE = "size";
+    public static final String KEY_SIZE = "size";
 
-    String KEY_ITEMS = "items";
+    public static final String KEY_ITEMS = "items";
+
+    public static final String PARAM_SIZE = KEY_SIZE;
+
+    public static final int NB_MAX = 10;
+
+    protected int size;
+
+    protected int max = NB_MAX;
+
+    protected Pipe pipe;
 
     /**
      *
      * @param request current request
      * @return true if this writer handles that request
      */
-    boolean handleRequest(SlingHttpServletRequest request);
+    public abstract boolean handleRequest(SlingHttpServletRequest request);
 
     /**
      * Init the writer, writes beginning of the output
      * @param request request from which writer will output
      * @param response response on which writer will output
-     * @param pipe pipe whose output will be written
      * @throws IOException error handling streams
      * @throws JSONException in case invalid json is written
      */
-    void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException;
+    public void init(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException{
+        max = request.getParameter(PARAM_SIZE) != null ? Integer.parseInt(request.getParameter(PARAM_SIZE)) : NB_MAX;
+        if (max < 0) {
+            max = Integer.MAX_VALUE;
+        }
+        initInternal(request, response);
+    }
+
+    /**
+     * Init the writer, writes beginning of the output
+     * @param request request from which writer will output
+     * @param response response on which writer will output
+     * @throws IOException error handling streams
+     * @throws JSONException in case invalid json is written
+     */
+    protected abstract void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException;
 
     /**
      * Write a given resource
      * @param resource resource that will be written
      * @throws JSONException in case write fails
      */
-    void writeItem(Resource resource) throws JSONException;
+    public void write(Resource resource) throws JSONException{
+        if (size++ < max) {
+            writeItem(resource);
+        }
+    }
+
+    /**
+     * Write a given resource
+     * @param resource resource that will be written
+     * @throws JSONException in case write fails
+     */
+    protected abstract void writeItem(Resource resource) throws JSONException;
 
     /**
      * writes the end of the output
-     * @param size size of the overall result
      * @throws JSONException in case invalid json is written
      */
 
-    void ends(int size) throws JSONException;
+    public abstract void ends() throws JSONException;
+
+    /**
+     *
+     * @param pipe
+     */
+    public void setPipe(Pipe pipe) {
+        this.pipe = pipe;
+    }
 }
diff --git a/src/main/java/org/apache/sling/pipes/Pipe.java b/src/main/java/org/apache/sling/pipes/Pipe.java
index 30ceace..d650dbb 100644
--- a/src/main/java/org/apache/sling/pipes/Pipe.java
+++ b/src/main/java/org/apache/sling/pipes/Pipe.java
@@ -89,6 +89,13 @@ public interface Pipe {
      */
     Resource getInput();
 
+
+    /**
+     * get the pipe configuration resource
+     * @return
+     */
+    Resource getResource();
+
     /**
      * returns the binding output used in container pipe's expression
      * @return object, either value map or something else, that will be used in nashorn for computing expressions
diff --git a/src/main/java/org/apache/sling/pipes/Plumber.java b/src/main/java/org/apache/sling/pipes/Plumber.java
index 12ec4eb..d831167 100644
--- a/src/main/java/org/apache/sling/pipes/Plumber.java
+++ b/src/main/java/org/apache/sling/pipes/Plumber.java
@@ -19,6 +19,7 @@ package org.apache.sling.pipes;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.jobs.Job;
 
 import java.util.Map;
 import java.util.Set;
@@ -38,36 +39,37 @@ public interface Plumber {
     Pipe getPipe(Resource resource);
 
     /**
+     * executes in a background thread
+     * @param resolver
+     * @param path
+     * @param bindings
+     * @return Job if registered, null otherwise
+     */
+    Job executeAsync(ResourceResolver resolver, String path, Map bindings);
+
+    /**
      * Executes a pipe at a certain path
      * @param resolver resource resolver with which pipe will be executed
      * @param path path of a valid pipe configuration
      * @param bindings bindings to add to the execution of the pipe, can be null
+     * @param writer output of the pipe
      * @param save in case that pipe writes anything, wether the plumber should save changes or not
      * @throws Exception in case execution fails
      * @return set of paths of output resources
      */
-    Set<String> execute(ResourceResolver resolver, String path, Map bindings, boolean save) throws Exception;
+    Set<String> execute(ResourceResolver resolver, String path, Map bindings, OutputWriter writer, boolean save) throws Exception;
 
     /**
      * Executes a given pipe
      * @param resolver resource resolver with which pipe will be executed
      * @param pipe pipe to execute
      * @param bindings bindings to add to the execution of the pipe, can be null
+     * @param writer output of the pipe
      * @param save in case that pipe writes anything, wether the plumber should save changes or not
      * @throws Exception in case execution fails
      * @return set of paths of output resources
      */
-    Set<String> execute(ResourceResolver resolver, Pipe pipe, Map bindings, boolean save) throws Exception;
-
-    /**
-     * Persist some pipe changes, and eventually distribute changes
-     * @param resolver resolver with which changes will be persisted
-     * @param pipe pipe from which the change occurred
-     * @param paths set of changed paths
-     * @throws PersistenceException in case persisting fails
-     */
-
-    void persist(ResourceResolver resolver, Pipe pipe, Set<String> paths) throws PersistenceException;
+    Set<String> execute(ResourceResolver resolver, Pipe pipe, Map bindings, OutputWriter writer, boolean save) throws Exception;
 
     /**
      * Registers
@@ -76,5 +78,17 @@ public interface Plumber {
      */
     void registerPipe(String type, Class<? extends BasePipe> pipeClass);
 
+    /**
+     * status of the pipe
+     * @param pipeResource resource corresponding to the pipe
+     * @return
+     */
+    String getStatus(Resource pipeResource);
 
+    /**
+     * returns true if the pipe is considered to be running
+     * @param pipeResource resource corresponding to the pipe
+     * @return
+     */
+    boolean isRunning(Resource pipeResource);
 }
diff --git a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java b/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
index a86e48a..7005598 100644
--- a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
+++ b/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
@@ -29,23 +29,20 @@ import org.apache.sling.pipes.Pipe;
 /**
  * default output writer with size and output resources' path
  */
-public class DefaultOutputWriter implements OutputWriter {
+public class DefaultOutputWriter extends OutputWriter {
 
     protected JSONWriter writer;
 
-    protected Pipe pipe;
-
     @Override
     public boolean handleRequest(SlingHttpServletRequest request) {
         return true;
     }
 
     @Override
-    public void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException {
+    protected void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException {
         response.setCharacterEncoding("utf-8");
         response.setContentType("application/json");
         writer = new JSONWriter(response.getWriter());
-        this.pipe = pipe;
         writer.object();
         writer.key(KEY_ITEMS);
         writer.array();
@@ -57,7 +54,7 @@ public class DefaultOutputWriter implements OutputWriter {
     }
 
     @Override
-    public void ends(int size) throws JSONException {
+    public void ends() throws JSONException {
         writer.endArray();
         writer.key(KEY_SIZE).value(size);
         writer.endObject();
diff --git a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java b/src/main/java/org/apache/sling/pipes/internal/NopWriter.java
similarity index 57%
copy from src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
copy to src/main/java/org/apache/sling/pipes/internal/NopWriter.java
index a86e48a..cd26e70 100644
--- a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
+++ b/src/main/java/org/apache/sling/pipes/internal/NopWriter.java
@@ -16,50 +16,32 @@
  */
 package org.apache.sling.pipes.internal;
 
-import java.io.IOException;
-
 import org.apache.sling.api.SlingHttpServletRequest;
 import org.apache.sling.api.SlingHttpServletResponse;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.io.JSONWriter;
 import org.apache.sling.pipes.OutputWriter;
-import org.apache.sling.pipes.Pipe;
-
-/**
- * default output writer with size and output resources' path
- */
-public class DefaultOutputWriter implements OutputWriter {
-
-    protected JSONWriter writer;
 
-    protected Pipe pipe;
+import java.io.IOException;
 
+public class NopWriter extends OutputWriter {
     @Override
     public boolean handleRequest(SlingHttpServletRequest request) {
-        return true;
+        return false;
     }
 
     @Override
-    public void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException {
-        response.setCharacterEncoding("utf-8");
-        response.setContentType("application/json");
-        writer = new JSONWriter(response.getWriter());
-        this.pipe = pipe;
-        writer.object();
-        writer.key(KEY_ITEMS);
-        writer.array();
+    protected void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException {
+        //nop
     }
 
     @Override
-    public void writeItem(Resource resource) throws JSONException {
-        writer.value(resource.getPath());
+    protected void writeItem(Resource resource) throws JSONException {
+        //nop
     }
 
     @Override
-    public void ends(int size) throws JSONException {
-        writer.endArray();
-        writer.key(KEY_SIZE).value(size);
-        writer.endObject();
+    public void ends() throws JSONException {
+        //nop
     }
 }
diff --git a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
index 87645b0..a10673c 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
@@ -16,49 +16,75 @@
  */
 package org.apache.sling.pipes.internal;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.*;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.Distributor;
 import org.apache.sling.distribution.SimpleDistributionRequest;
-import org.apache.sling.pipes.BasePipe;
-import org.apache.sling.pipes.ContainerPipe;
-import org.apache.sling.pipes.Pipe;
-import org.apache.sling.pipes.Plumber;
-import org.apache.sling.pipes.ReferencePipe;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.pipes.*;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+import javax.jcr.RepositoryException;
+
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.pipes.BasePipe.*;
+
 /**
- * implements plumber interface, and registers default pipes
+ * implements plumber interface, registers default pipes, and provides execution facilities
  */
-@Component(service = {Plumber.class})
-public class PlumberImpl implements Plumber {
+@Component(service = {Plumber.class, JobConsumer.class}, property = {
+        JobConsumer.PROPERTY_TOPICS +"="+PlumberImpl.SLING_EVENT_TOPIC
+})
+@Designate(ocd = PlumberImpl.Configuration.class)
+public class PlumberImpl implements Plumber, JobConsumer {
     private final Logger log = LoggerFactory.getLogger(this.getClass());
+    public static final int DEFAULT_BUFFER_SIZE = 1000;
+
+    @ObjectClassDefinition(name="Apache Sling Pipes : Plumber configuration")
+    public @interface Configuration {
+        @AttributeDefinition(description="Number of iterations after which plumber should saves a pipe execution")
+        int bufferSize() default PlumberImpl.DEFAULT_BUFFER_SIZE;
+
+        @AttributeDefinition(description="Name of service user, with appropriate rights, that will be used for async execution")
+        String serviceUser();
+
+        @AttributeDefinition(description="Users allowed to register async pipes")
+        String[] authorizedUsers() default  {"admin"};
+    }
 
     Map<String, Class<? extends BasePipe>> registry;
 
-    @Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL)
-    protected volatile Distributor distributor = null;
+    public static final String SLING_EVENT_TOPIC = "org/apache/sling/pipes/topic";
+
+    private int bufferSize;
+
+    private Map serviceUser;
+
+    private List<String> allowedUsers;
 
     @Activate
-    public void activate(){
+    public void activate(Configuration configuration){
+        bufferSize = configuration.bufferSize();
+        serviceUser = Collections.singletonMap(SUBSERVICE, configuration.serviceUser());
+        allowedUsers = Arrays.asList(configuration.authorizedUsers());
         registry = new HashMap<>();
         registerPipe(BasePipe.RESOURCE_TYPE, BasePipe.class);
         registerPipe(ContainerPipe.RESOURCE_TYPE, ContainerPipe.class);
@@ -75,8 +101,18 @@ public class PlumberImpl implements Plumber {
         registerPipe(PathPipe.RESOURCE_TYPE, PathPipe.class);
         registerPipe(FilterPipe.RESOURCE_TYPE, FilterPipe.class);
         registerPipe(NotPipe.RESOURCE_TYPE, NotPipe.class);
+
     }
 
+    @Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL)
+    protected volatile Distributor distributor = null;
+
+    @Reference
+    JobManager jobManager;
+
+    @Reference
+    ResourceResolverFactory factory;
+
     @Override
     public Pipe getPipe(Resource resource) {
         if ((resource == null) || !registry.containsKey(resource.getResourceType())) {
@@ -93,43 +129,84 @@ public class PlumberImpl implements Plumber {
     }
 
     @Override
-    public Set<String> execute(ResourceResolver resolver, String path, Map additionalBindings, boolean save) throws Exception {
+    public Job executeAsync(ResourceResolver resolver, String path, Map bindings) {
+        if (allowedUsers.contains(resolver.getUserID())) {
+            if (StringUtils.isBlank((String)serviceUser.get(SUBSERVICE))) {
+                log.error("please configure plumber service user");
+            }
+            final Map props = new HashMap();
+            props.put(SlingConstants.PROPERTY_PATH, path);
+            props.put(PipeBindings.NN_ADDITIONALBINDINGS, bindings);
+            return jobManager.addJob(SLING_EVENT_TOPIC, props);
+        }
+        return null;
+    }
+
+    @Override
+    public Set<String> execute(ResourceResolver resolver, String path, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
         Resource pipeResource = resolver.getResource(path);
         Pipe pipe = getPipe(pipeResource);
         if (pipe == null) {
             throw new Exception("unable to build pipe based on configuration at " + path);
         }
-        return execute(resolver, pipe, additionalBindings, save);
+        if (additionalBindings != null && (Boolean)additionalBindings.getOrDefault(BasePipe.READ_ONLY, true) && pipe.modifiesContent()) {
+            throw new Exception("This pipe modifies content, you should use a POST request");
+        }
+        return execute(resolver, pipe, additionalBindings, writer, save);
     }
 
     @Override
-    public Set<String> execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, boolean save) throws Exception {
-        if (additionalBindings != null && pipe instanceof ContainerPipe){
-            pipe.getBindings().addBindings(additionalBindings);
-        }
+    public Set<String> execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
+        try {
+            if (additionalBindings != null && pipe instanceof ContainerPipe){
+                pipe.getBindings().addBindings(additionalBindings);
+            }
+            log.info("[{}] execution starts, save ({})", pipe, save);
+            writer.setPipe(pipe);
+            if (isRunning(pipe.getResource())){
+                throw new RuntimeException("Pipe is already running");
+            }
+            writeStatus(pipe, STATUS_STARTED);
+            resolver.commit();
 
-        log.info("[{}] execution starts, save ({})", pipe, save);
-        Set<String> set = new HashSet<>();
-        for (Iterator<Resource> it = pipe.getOutput(); it.hasNext();){
-            Resource resource = it.next();
-            if (resource != null) {
-                log.debug("[{}] retrieved {}", pipe.getName(), resource.getPath());
-                set.add(resource.getPath());
+            Set<String> set = new HashSet<>();
+            for (Iterator<Resource> it = pipe.getOutput(); it.hasNext();){
+                Resource resource = it.next();
+                if (resource != null) {
+                    log.debug("[{}] retrieved {}", pipe.getName(), resource.getPath());
+                    writer.write(resource);
+                    set.add(resource.getPath());
+                    persist(resolver, pipe, set, resource);
+                }
             }
+            if (save && pipe.modifiesContent()) {
+                persist(resolver, pipe, set, null);
+            }
+            log.info("[{}] done executing.", pipe.getName());
+            writer.ends();
+            return set;
+        } finally {
+            writeStatus(pipe, STATUS_FINISHED);
+            resolver.commit();
         }
-        if (save) {
-            persist(resolver, pipe, set);
-        }
-        log.info("[{}] done executing.", pipe.getName());
-        return set;
     }
 
-    @Override
-    public void persist(ResourceResolver resolver, Pipe pipe, Set<String> paths) throws PersistenceException {
+    /**
+     * Persists pipe change if big enough, or ended, and eventually distribute changes
+     * @param resolver
+     * @param pipe
+     * @param paths
+     * @param currentResource if running, null if ended
+     * @throws PersistenceException
+     */
+    protected void persist(ResourceResolver resolver, Pipe pipe, Set<String> paths, Resource currentResource) throws Exception {
         if  (pipe.modifiesContent() && resolver.hasChanges() && !pipe.isDryRun()){
-            log.info("[{}] saving changes...", pipe.getName());
-            resolver.commit();
-            if (distributor != null && StringUtils.isNotBlank(pipe.getDistributionAgent())) {
+            if (currentResource == null || paths.size() % bufferSize == 0){
+                log.info("[{}] saving changes...", pipe.getName());
+                writeStatus(pipe, currentResource == null ? STATUS_FINISHED : currentResource.getPath());
+                resolver.commit();
+            }
+            if (currentResource == null && distributor != null && StringUtils.isNotBlank(pipe.getDistributionAgent())) {
                 log.info("a distribution agent is configured, will try to distribute the changes");
                 DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, true, paths.toArray(new String[paths.size()]));
                 DistributionResponse response = distributor.distribute(pipe.getDistributionAgent(), resolver, request);
@@ -142,4 +219,51 @@ public class PlumberImpl implements Plumber {
     public void registerPipe(String type, Class<? extends BasePipe> pipeClass) {
         registry.put(type, pipeClass);
     }
+
+    /**
+     * writes the status of the pipe
+     * @param pipe
+     * @param status
+     */
+    protected void writeStatus(Pipe pipe, String status) throws RepositoryException {
+        if (StringUtils.isNotBlank(status)){
+            ModifiableValueMap vm = pipe.getResource().adaptTo(ModifiableValueMap.class);
+            vm.put(PN_STATUS, status);
+            Calendar cal = new GregorianCalendar();
+            cal.setTime(new Date());
+            vm.put(PN_STATUS_MODIFIED, cal);
+        }
+    }
+
+    @Override
+    public String getStatus(Resource pipeResource) {
+        Resource statusResource = pipeResource.getChild(PN_STATUS);
+        if (statusResource != null){
+            String status = statusResource.adaptTo(String.class);
+            if (StringUtils.isNotBlank(status)){
+                return status;
+            }
+        }
+        return STATUS_FINISHED;
+    }
+
+    @Override
+    public boolean isRunning(Resource pipeResource) {
+        return !getStatus(pipeResource).equals(STATUS_FINISHED);
+    }
+
+    @Override
+    public JobResult process(Job job) {
+        try(ResourceResolver resolver = factory.getServiceResourceResolver(serviceUser)){
+            String path = (String)job.getProperty(SlingConstants.PROPERTY_PATH);
+            Map bindings = (Map)job.getProperty(PipeBindings.NN_ADDITIONALBINDINGS);
+            execute(resolver, path, bindings, new NopWriter(), true);
+            return JobResult.OK;
+        } catch (LoginException e) {
+            log.error("unable to retrieve resolver for executing scheduled pipe", e);
+        } catch (Exception e) {
+            log.error("failed to execute the pipe", e);
+        }
+        return JobResult.FAILED;
+    }
 }
diff --git a/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java b/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
index 60b7169..1672840 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
@@ -17,30 +17,21 @@
 package org.apache.sling.pipes.internal;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.*;
 
 import javax.servlet.Servlet;
 import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.sling.api.SlingHttpServletRequest;
 import org.apache.sling.api.SlingHttpServletResponse;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
 import org.apache.sling.api.servlets.ServletResolverConstants;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;
-import org.apache.sling.pipes.BasePipe;
-import org.apache.sling.pipes.ContainerPipe;
-import org.apache.sling.pipes.OutputWriter;
-import org.apache.sling.pipes.Pipe;
-import org.apache.sling.pipes.PipeBindings;
-import org.apache.sling.pipes.Plumber;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.pipes.*;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 
@@ -50,7 +41,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Servlet executing plumber for a pipe path given as 'path' parameter,
  * it can also be launched against a container pipe resource directly (no need for path parameter)
- *
  */
 @Component(service = {Servlet.class},
         property= {
@@ -70,16 +60,18 @@ public class PlumberServlet extends SlingAllMethodsServlet {
 
     protected static final String PARAM_BINDINGS = "bindings";
 
-    protected static final String PARAM_SIZE = "size";
-
-    public static final int NB_MAX = 10;
+    protected static final String PARAM_ASYNC = "async";
 
     @Reference
     Plumber plumber;
 
     @Override
     protected void doGet(SlingHttpServletRequest request, SlingHttpServletResponse response) throws ServletException, IOException {
-        execute(request, response, false);
+        if (Arrays.asList(request.getRequestPathInfo().getSelectors()).contains(BasePipe.PN_STATUS)){
+            response.getWriter().append(plumber.getStatus(request.getResource()));
+        } else {
+            execute(request, response, false);
+        }
     }
 
     @Override
@@ -93,62 +85,60 @@ public class PlumberServlet extends SlingAllMethodsServlet {
             if (StringUtils.isBlank(path)) {
                 throw new Exception("path should be provided");
             }
-            String dryRun = request.getParameter(BasePipe.DRYRUN_KEY);
-            int size = request.getParameter(PARAM_SIZE) != null ? Integer.parseInt(request.getParameter(PARAM_SIZE)) : NB_MAX;
-            if (size < 0) {
-                size = Integer.MAX_VALUE;
-            }
-
-            ResourceResolver resolver = request.getResourceResolver();
-            Resource pipeResource = resolver.getResource(path);
-            Pipe pipe = plumber.getPipe(pipeResource);
-            PipeBindings bindings = pipe.getBindings();
-
-            if (StringUtils.isNotBlank(dryRun) && dryRun.equals(Boolean.TRUE.toString())) {
-                bindings.addBinding(BasePipe.DRYRUN_KEY, true);
-            }
-
-            String paramBindings = request.getParameter(PARAM_BINDINGS);
-            if (StringUtils.isNotBlank(paramBindings)){
-                try {
-                    JSONObject bindingJSON = new JSONObject(paramBindings);
-                    for (Iterator<String> keys = bindingJSON.keys(); keys.hasNext();){
-                        String key = keys.next();
-                        bindings.addBinding(key, bindingJSON.get(key));
-                    }
-                } catch (Exception e){
-                    log.error("Unable to retrieve bindings information", e);
+            Map bindings = getBindingsFromRequest(request, writeAllowed);
+            String asyncParam = request.getParameter(PARAM_ASYNC);
+            if (StringUtils.isNotBlank(asyncParam) && asyncParam.equals(Boolean.TRUE.toString())){
+                Job job = plumber.executeAsync(request.getResourceResolver(), path, bindings);
+                if (job != null){
+                    response.getWriter().append("pipe execution registered as " + job.getId());
+                    response.setStatus(HttpServletResponse.SC_CREATED);
+                } else {
+                    response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Some issue with your request, or server not being ready for async execution");
                 }
+            } else {
+                OutputWriter writer = getWriter(request, response);
+                plumber.execute(request.getResourceResolver(), path, bindings, writer, true);
             }
-            if (!writeAllowed && pipe.modifiesContent()) {
-                throw new Exception("This pipe modifies content, you should use a POST request");
-            }
-            OutputWriter writer = getWriter(request, response, pipe);
-            int i = 0;
-            Iterator<Resource> resourceIterator = pipe.getOutput();
-            Set<String> paths = new HashSet<String>();
-            while (resourceIterator.hasNext()){
-                Resource resource = resourceIterator.next();
-                paths.add(resource.getPath());
-                if (i++ < size) {
-                    writer.writeItem(resource);
-                }
-            }
-            writer.ends(i);
-            plumber.persist(resolver, pipe, paths);
         } catch (Exception e) {
             throw new ServletException(e);
         }
     }
 
-    OutputWriter getWriter(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException {
+    /**
+     * Converts request into pipe bindings
+     * @param request
+     * @return
+     */
+    protected Map getBindingsFromRequest(SlingHttpServletRequest request, boolean writeAllowed){
+        Map bindings = new HashMap<>();
+        String dryRun = request.getParameter(BasePipe.DRYRUN_KEY);
+        if (StringUtils.isNotBlank(dryRun) && dryRun.equals(Boolean.TRUE.toString())) {
+            bindings.put(BasePipe.DRYRUN_KEY, true);
+        }
+        String paramBindings = request.getParameter(PARAM_BINDINGS);
+        if (StringUtils.isNotBlank(paramBindings)){
+            try {
+                JSONObject bindingJSON = new JSONObject(paramBindings);
+                for (Iterator<String> keys = bindingJSON.keys(); keys.hasNext();){
+                    String key = keys.next();
+                    bindings.put(key, bindingJSON.get(key));
+                }
+            } catch (Exception e){
+                log.error("Unable to retrieve bindings information", e);
+            }
+        }
+        bindings.put(BasePipe.READ_ONLY, !writeAllowed);
+        return bindings;
+    }
+
+    OutputWriter getWriter(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException {
         OutputWriter[] candidates = new OutputWriter[]{new CustomJsonWriter(), new CustomWriter(), new DefaultOutputWriter()};
         for (OutputWriter candidate : candidates) {
             if (candidate.handleRequest(request)) {
-                candidate.init(request, response, pipe);
+                candidate.init(request, response);
                 return candidate;
             }
         }
         return null;
     }
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/pipes/package-info.java b/src/main/java/org/apache/sling/pipes/package-info.java
index b5e84e4..44b5df9 100644
--- a/src/main/java/org/apache/sling/pipes/package-info.java
+++ b/src/main/java/org/apache/sling/pipes/package-info.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("0.0.10")
+@Version("1.0.0")
 package org.apache.sling.pipes;
 
 import org.osgi.annotation.versioning.Version;
diff --git a/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java b/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java
index 08e8047..a6a1afb 100644
--- a/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java
+++ b/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java
@@ -30,6 +30,8 @@ import org.junit.Rule;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * this abstract class for pipes implements a plumber with all registered pipes, plus some test ones, and give some paths,
@@ -53,7 +55,11 @@ public class AbstractPipeTest {
     @Before
     public void setup(){
         PlumberImpl plumberImpl = new PlumberImpl();
-        plumberImpl.activate();
+        PlumberImpl.Configuration configuration = mock(PlumberImpl.Configuration.class);
+        when(configuration.authorizedUsers()).thenReturn(new String[]{});
+        when(configuration.serviceUser()).thenReturn(null);
+        when(configuration.bufferSize()).thenReturn(PlumberImpl.DEFAULT_BUFFER_SIZE);
+        plumberImpl.activate(configuration);
         plumberImpl.registerPipe("slingPipes/dummyNull", DummyNull.class);
         plumberImpl.registerPipe("slingPipes/dummySearch", DummySearch.class);
         plumber = plumberImpl;
diff --git a/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java b/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java
index 5c18229..23d6775 100644
--- a/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java
+++ b/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java
@@ -205,7 +205,7 @@ public class PlumberServletTest extends AbstractPipeTest {
         when(request.getParameter(PlumberServlet.PARAM_BINDINGS)).thenReturn(bindings);
         when(request.getParameter(CustomWriter.PARAM_WRITER)).thenReturn(writer);
         when(request.getParameter(BasePipe.DRYRUN_KEY)).thenReturn(dryRun);
-        when(request.getParameter(PlumberServlet.PARAM_SIZE)).thenReturn(size);
+        when(request.getParameter(OutputWriter.PARAM_SIZE)).thenReturn(size);
         return request;
     }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.