You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:57:25 UTC
[sling-org-apache-sling-pipes] 09/19: SLING-6623 allow async
execution of pipes
This is an automated email from the ASF dual-hosted git repository.
rombert pushed a commit to annotated tag org.apache.sling.pipes-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-pipes.git
commit 4c2f290b95b8b316b36c0fcc571d8e2dc772ff21
Author: Robert Munteanu <ro...@apache.org>
AuthorDate: Wed May 24 07:22:22 2017 +0000
SLING-6623 allow async execution of pipes
- moved execution code to a central place, under plumber implementation,
- added status, and bufferization of the commits (async execution means potentially long executions),
- added job registering & processing capabilities for plumber, based on a configurable service user (will update documentation accordingly)
This closes #229
Submitted-By: Nicolas Peltier
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/extensions/sling-pipes@1796004 13f79535-47bb-0310-9956-ffa450edef68
---
pom.xml | 7 +-
src/main/java/org/apache/sling/pipes/BasePipe.java | 10 +
.../java/org/apache/sling/pipes/OutputWriter.java | 60 +++++-
src/main/java/org/apache/sling/pipes/Pipe.java | 7 +
src/main/java/org/apache/sling/pipes/Plumber.java | 38 ++--
.../sling/pipes/internal/DefaultOutputWriter.java | 9 +-
.../{DefaultOutputWriter.java => NopWriter.java} | 36 +---
.../apache/sling/pipes/internal/PlumberImpl.java | 208 ++++++++++++++++-----
.../sling/pipes/internal/PlumberServlet.java | 112 +++++------
.../java/org/apache/sling/pipes/package-info.java | 2 +-
.../org/apache/sling/pipes/AbstractPipeTest.java | 8 +-
.../sling/pipes/internal/PlumberServletTest.java | 2 +-
12 files changed, 338 insertions(+), 161 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1ba63ae..77fb030 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
<artifactId>org.apache.sling.pipes</artifactId>
<packaging>bundle</packaging>
- <version>0.0.11-SNAPSHOT</version>
+ <version>1.0.0-SNAPSHOT</version>
<name>Apache Sling Pipes</name>
<description>bulk content changes tool</description>
@@ -137,6 +137,11 @@
<version>2.0.6</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.event.api</artifactId>
+ <version>1.0.0</version>
+ </dependency>
<!-- testing -->
<dependency>
<groupId>junit</groupId>
diff --git a/src/main/java/org/apache/sling/pipes/BasePipe.java b/src/main/java/org/apache/sling/pipes/BasePipe.java
index afaad61..b644b13 100644
--- a/src/main/java/org/apache/sling/pipes/BasePipe.java
+++ b/src/main/java/org/apache/sling/pipes/BasePipe.java
@@ -37,6 +37,11 @@ public class BasePipe implements Pipe {
public static final String RESOURCE_TYPE = "slingPipes/base";
public static final String DRYRUN_KEY = "dryRun";
+ public static final String READ_ONLY = "readOnly";
+ public static final String PN_STATUS = "status";
+ public static final String PN_STATUS_MODIFIED = "statusModified";
+ public static final String STATUS_STARTED = "started";
+ public static final String STATUS_FINISHED = "finished";
protected static final String DRYRUN_EXPR = "${" + DRYRUN_KEY + "}";
protected ResourceResolver resolver;
@@ -63,6 +68,11 @@ public class BasePipe implements Pipe {
this.parent = parent;
}
+ @Override
+ public Resource getResource() {
+ return resource;
+ }
+
protected Plumber plumber;
private String name;
diff --git a/src/main/java/org/apache/sling/pipes/OutputWriter.java b/src/main/java/org/apache/sling/pipes/OutputWriter.java
index 3825b2a..13e1bc7 100644
--- a/src/main/java/org/apache/sling/pipes/OutputWriter.java
+++ b/src/main/java/org/apache/sling/pipes/OutputWriter.java
@@ -26,41 +26,83 @@ import java.io.IOException;
/**
* defines how pipe's output get written to a servlet response
*/
-public interface OutputWriter {
+public abstract class OutputWriter {
- String KEY_SIZE = "size";
+ public static final String KEY_SIZE = "size";
- String KEY_ITEMS = "items";
+ public static final String KEY_ITEMS = "items";
+
+ public static final String PARAM_SIZE = KEY_SIZE;
+
+ public static final int NB_MAX = 10;
+
+ protected int size;
+
+ protected int max = NB_MAX;
+
+ protected Pipe pipe;
/**
*
* @param request current request
* @return true if this writer handles that request
*/
- boolean handleRequest(SlingHttpServletRequest request);
+ public abstract boolean handleRequest(SlingHttpServletRequest request);
/**
* Init the writer, writes beginning of the output
* @param request request from which writer will output
* @param response response on which writer will output
- * @param pipe pipe whose output will be written
* @throws IOException error handling streams
* @throws JSONException in case invalid json is written
*/
- void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException;
+ public void init(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException{
+ max = request.getParameter(PARAM_SIZE) != null ? Integer.parseInt(request.getParameter(PARAM_SIZE)) : NB_MAX;
+ if (max < 0) {
+ max = Integer.MAX_VALUE;
+ }
+ initInternal(request, response);
+ }
+
+ /**
+ * Init the writer, writes beginning of the output
+ * @param request request from which writer will output
+ * @param response response on which writer will output
+ * @throws IOException error handling streams
+ * @throws JSONException in case invalid json is written
+ */
+ protected abstract void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException;
/**
* Write a given resource
* @param resource resource that will be written
* @throws JSONException in case write fails
*/
- void writeItem(Resource resource) throws JSONException;
+ public void write(Resource resource) throws JSONException{
+ if (size++ < max) {
+ writeItem(resource);
+ }
+ }
+
+ /**
+ * Write a given resource
+ * @param resource resource that will be written
+ * @throws JSONException in case write fails
+ */
+ protected abstract void writeItem(Resource resource) throws JSONException;
/**
* writes the end of the output
- * @param size size of the overall result
* @throws JSONException in case invalid json is written
*/
- void ends(int size) throws JSONException;
+ public abstract void ends() throws JSONException;
+
+ /**
+ *
+ * @param pipe
+ */
+ public void setPipe(Pipe pipe) {
+ this.pipe = pipe;
+ }
}
diff --git a/src/main/java/org/apache/sling/pipes/Pipe.java b/src/main/java/org/apache/sling/pipes/Pipe.java
index 30ceace..d650dbb 100644
--- a/src/main/java/org/apache/sling/pipes/Pipe.java
+++ b/src/main/java/org/apache/sling/pipes/Pipe.java
@@ -89,6 +89,13 @@ public interface Pipe {
*/
Resource getInput();
+
+ /**
+ * get the pipe configuration resource
+ * @return
+ */
+ Resource getResource();
+
/**
* returns the binding output used in container pipe's expression
* @return object, either value map or something else, that will be used in nashorn for computing expressions
diff --git a/src/main/java/org/apache/sling/pipes/Plumber.java b/src/main/java/org/apache/sling/pipes/Plumber.java
index 12ec4eb..d831167 100644
--- a/src/main/java/org/apache/sling/pipes/Plumber.java
+++ b/src/main/java/org/apache/sling/pipes/Plumber.java
@@ -19,6 +19,7 @@ package org.apache.sling.pipes;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.jobs.Job;
import java.util.Map;
import java.util.Set;
@@ -38,36 +39,37 @@ public interface Plumber {
Pipe getPipe(Resource resource);
/**
+ * executes in a background thread
+ * @param resolver
+ * @param path
+ * @param bindings
+ * @return Job if registered, null otherwise
+ */
+ Job executeAsync(ResourceResolver resolver, String path, Map bindings);
+
+ /**
* Executes a pipe at a certain path
* @param resolver resource resolver with which pipe will be executed
* @param path path of a valid pipe configuration
* @param bindings bindings to add to the execution of the pipe, can be null
+ * @param writer output of the pipe
* @param save in case that pipe writes anything, wether the plumber should save changes or not
* @throws Exception in case execution fails
* @return set of paths of output resources
*/
- Set<String> execute(ResourceResolver resolver, String path, Map bindings, boolean save) throws Exception;
+ Set<String> execute(ResourceResolver resolver, String path, Map bindings, OutputWriter writer, boolean save) throws Exception;
/**
* Executes a given pipe
* @param resolver resource resolver with which pipe will be executed
* @param pipe pipe to execute
* @param bindings bindings to add to the execution of the pipe, can be null
+ * @param writer output of the pipe
* @param save in case that pipe writes anything, wether the plumber should save changes or not
* @throws Exception in case execution fails
* @return set of paths of output resources
*/
- Set<String> execute(ResourceResolver resolver, Pipe pipe, Map bindings, boolean save) throws Exception;
-
- /**
- * Persist some pipe changes, and eventually distribute changes
- * @param resolver resolver with which changes will be persisted
- * @param pipe pipe from which the change occurred
- * @param paths set of changed paths
- * @throws PersistenceException in case persisting fails
- */
-
- void persist(ResourceResolver resolver, Pipe pipe, Set<String> paths) throws PersistenceException;
+ Set<String> execute(ResourceResolver resolver, Pipe pipe, Map bindings, OutputWriter writer, boolean save) throws Exception;
/**
* Registers
@@ -76,5 +78,17 @@ public interface Plumber {
*/
void registerPipe(String type, Class<? extends BasePipe> pipeClass);
+ /**
+ * status of the pipe
+ * @param pipeResource resource corresponding to the pipe
+ * @return
+ */
+ String getStatus(Resource pipeResource);
+ /**
+ * returns true if the pipe is considered to be running
+ * @param pipeResource resource corresponding to the pipe
+ * @return
+ */
+ boolean isRunning(Resource pipeResource);
}
diff --git a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java b/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
index a86e48a..7005598 100644
--- a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
+++ b/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
@@ -29,23 +29,20 @@ import org.apache.sling.pipes.Pipe;
/**
* default output writer with size and output resources' path
*/
-public class DefaultOutputWriter implements OutputWriter {
+public class DefaultOutputWriter extends OutputWriter {
protected JSONWriter writer;
- protected Pipe pipe;
-
@Override
public boolean handleRequest(SlingHttpServletRequest request) {
return true;
}
@Override
- public void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException {
+ protected void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException {
response.setCharacterEncoding("utf-8");
response.setContentType("application/json");
writer = new JSONWriter(response.getWriter());
- this.pipe = pipe;
writer.object();
writer.key(KEY_ITEMS);
writer.array();
@@ -57,7 +54,7 @@ public class DefaultOutputWriter implements OutputWriter {
}
@Override
- public void ends(int size) throws JSONException {
+ public void ends() throws JSONException {
writer.endArray();
writer.key(KEY_SIZE).value(size);
writer.endObject();
diff --git a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java b/src/main/java/org/apache/sling/pipes/internal/NopWriter.java
similarity index 57%
copy from src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
copy to src/main/java/org/apache/sling/pipes/internal/NopWriter.java
index a86e48a..cd26e70 100644
--- a/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java
+++ b/src/main/java/org/apache/sling/pipes/internal/NopWriter.java
@@ -16,50 +16,32 @@
*/
package org.apache.sling.pipes.internal;
-import java.io.IOException;
-
import org.apache.sling.api.SlingHttpServletRequest;
import org.apache.sling.api.SlingHttpServletResponse;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.io.JSONWriter;
import org.apache.sling.pipes.OutputWriter;
-import org.apache.sling.pipes.Pipe;
-
-/**
- * default output writer with size and output resources' path
- */
-public class DefaultOutputWriter implements OutputWriter {
-
- protected JSONWriter writer;
- protected Pipe pipe;
+import java.io.IOException;
+public class NopWriter extends OutputWriter {
@Override
public boolean handleRequest(SlingHttpServletRequest request) {
- return true;
+ return false;
}
@Override
- public void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException {
- response.setCharacterEncoding("utf-8");
- response.setContentType("application/json");
- writer = new JSONWriter(response.getWriter());
- this.pipe = pipe;
- writer.object();
- writer.key(KEY_ITEMS);
- writer.array();
+ protected void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException {
+ //nop
}
@Override
- public void writeItem(Resource resource) throws JSONException {
- writer.value(resource.getPath());
+ protected void writeItem(Resource resource) throws JSONException {
+ //nop
}
@Override
- public void ends(int size) throws JSONException {
- writer.endArray();
- writer.key(KEY_SIZE).value(size);
- writer.endObject();
+ public void ends() throws JSONException {
+ //nop
}
}
diff --git a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
index 87645b0..a10673c 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
@@ -16,49 +16,75 @@
*/
package org.apache.sling.pipes.internal;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import org.apache.commons.lang.StringUtils;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.*;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.Distributor;
import org.apache.sling.distribution.SimpleDistributionRequest;
-import org.apache.sling.pipes.BasePipe;
-import org.apache.sling.pipes.ContainerPipe;
-import org.apache.sling.pipes.Pipe;
-import org.apache.sling.pipes.Plumber;
-import org.apache.sling.pipes.ReferencePipe;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.pipes.*;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jcr.RepositoryException;
+
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+import static org.apache.sling.pipes.BasePipe.*;
+
/**
- * implements plumber interface, and registers default pipes
+ * implements plumber interface, registers default pipes, and provides execution facilities
*/
-@Component(service = {Plumber.class})
-public class PlumberImpl implements Plumber {
+@Component(service = {Plumber.class, JobConsumer.class}, property = {
+ JobConsumer.PROPERTY_TOPICS +"="+PlumberImpl.SLING_EVENT_TOPIC
+})
+@Designate(ocd = PlumberImpl.Configuration.class)
+public class PlumberImpl implements Plumber, JobConsumer {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ public static final int DEFAULT_BUFFER_SIZE = 1000;
+
+ @ObjectClassDefinition(name="Apache Sling Pipes : Plumber configuration")
+ public @interface Configuration {
+ @AttributeDefinition(description="Number of iterations after which plumber should saves a pipe execution")
+ int bufferSize() default PlumberImpl.DEFAULT_BUFFER_SIZE;
+
+ @AttributeDefinition(description="Name of service user, with appropriate rights, that will be used for async execution")
+ String serviceUser();
+
+ @AttributeDefinition(description="Users allowed to register async pipes")
+ String[] authorizedUsers() default {"admin"};
+ }
Map<String, Class<? extends BasePipe>> registry;
- @Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL)
- protected volatile Distributor distributor = null;
+ public static final String SLING_EVENT_TOPIC = "org/apache/sling/pipes/topic";
+
+ private int bufferSize;
+
+ private Map serviceUser;
+
+ private List<String> allowedUsers;
@Activate
- public void activate(){
+ public void activate(Configuration configuration){
+ bufferSize = configuration.bufferSize();
+ serviceUser = Collections.singletonMap(SUBSERVICE, configuration.serviceUser());
+ allowedUsers = Arrays.asList(configuration.authorizedUsers());
registry = new HashMap<>();
registerPipe(BasePipe.RESOURCE_TYPE, BasePipe.class);
registerPipe(ContainerPipe.RESOURCE_TYPE, ContainerPipe.class);
@@ -75,8 +101,18 @@ public class PlumberImpl implements Plumber {
registerPipe(PathPipe.RESOURCE_TYPE, PathPipe.class);
registerPipe(FilterPipe.RESOURCE_TYPE, FilterPipe.class);
registerPipe(NotPipe.RESOURCE_TYPE, NotPipe.class);
+
}
+ @Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL)
+ protected volatile Distributor distributor = null;
+
+ @Reference
+ JobManager jobManager;
+
+ @Reference
+ ResourceResolverFactory factory;
+
@Override
public Pipe getPipe(Resource resource) {
if ((resource == null) || !registry.containsKey(resource.getResourceType())) {
@@ -93,43 +129,84 @@ public class PlumberImpl implements Plumber {
}
@Override
- public Set<String> execute(ResourceResolver resolver, String path, Map additionalBindings, boolean save) throws Exception {
+ public Job executeAsync(ResourceResolver resolver, String path, Map bindings) {
+ if (allowedUsers.contains(resolver.getUserID())) {
+ if (StringUtils.isBlank((String)serviceUser.get(SUBSERVICE))) {
+ log.error("please configure plumber service user");
+ }
+ final Map props = new HashMap();
+ props.put(SlingConstants.PROPERTY_PATH, path);
+ props.put(PipeBindings.NN_ADDITIONALBINDINGS, bindings);
+ return jobManager.addJob(SLING_EVENT_TOPIC, props);
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> execute(ResourceResolver resolver, String path, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
Resource pipeResource = resolver.getResource(path);
Pipe pipe = getPipe(pipeResource);
if (pipe == null) {
throw new Exception("unable to build pipe based on configuration at " + path);
}
- return execute(resolver, pipe, additionalBindings, save);
+ if (additionalBindings != null && (Boolean)additionalBindings.getOrDefault(BasePipe.READ_ONLY, true) && pipe.modifiesContent()) {
+ throw new Exception("This pipe modifies content, you should use a POST request");
+ }
+ return execute(resolver, pipe, additionalBindings, writer, save);
}
@Override
- public Set<String> execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, boolean save) throws Exception {
- if (additionalBindings != null && pipe instanceof ContainerPipe){
- pipe.getBindings().addBindings(additionalBindings);
- }
+ public Set<String> execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
+ try {
+ if (additionalBindings != null && pipe instanceof ContainerPipe){
+ pipe.getBindings().addBindings(additionalBindings);
+ }
+ log.info("[{}] execution starts, save ({})", pipe, save);
+ writer.setPipe(pipe);
+ if (isRunning(pipe.getResource())){
+ throw new RuntimeException("Pipe is already running");
+ }
+ writeStatus(pipe, STATUS_STARTED);
+ resolver.commit();
- log.info("[{}] execution starts, save ({})", pipe, save);
- Set<String> set = new HashSet<>();
- for (Iterator<Resource> it = pipe.getOutput(); it.hasNext();){
- Resource resource = it.next();
- if (resource != null) {
- log.debug("[{}] retrieved {}", pipe.getName(), resource.getPath());
- set.add(resource.getPath());
+ Set<String> set = new HashSet<>();
+ for (Iterator<Resource> it = pipe.getOutput(); it.hasNext();){
+ Resource resource = it.next();
+ if (resource != null) {
+ log.debug("[{}] retrieved {}", pipe.getName(), resource.getPath());
+ writer.write(resource);
+ set.add(resource.getPath());
+ persist(resolver, pipe, set, resource);
+ }
}
+ if (save && pipe.modifiesContent()) {
+ persist(resolver, pipe, set, null);
+ }
+ log.info("[{}] done executing.", pipe.getName());
+ writer.ends();
+ return set;
+ } finally {
+ writeStatus(pipe, STATUS_FINISHED);
+ resolver.commit();
}
- if (save) {
- persist(resolver, pipe, set);
- }
- log.info("[{}] done executing.", pipe.getName());
- return set;
}
- @Override
- public void persist(ResourceResolver resolver, Pipe pipe, Set<String> paths) throws PersistenceException {
+ /**
+ * Persists pipe change if big enough, or ended, and eventually distribute changes
+ * @param resolver
+ * @param pipe
+ * @param paths
+ * @param currentResource if running, null if ended
+ * @throws PersistenceException
+ */
+ protected void persist(ResourceResolver resolver, Pipe pipe, Set<String> paths, Resource currentResource) throws Exception {
if (pipe.modifiesContent() && resolver.hasChanges() && !pipe.isDryRun()){
- log.info("[{}] saving changes...", pipe.getName());
- resolver.commit();
- if (distributor != null && StringUtils.isNotBlank(pipe.getDistributionAgent())) {
+ if (currentResource == null || paths.size() % bufferSize == 0){
+ log.info("[{}] saving changes...", pipe.getName());
+ writeStatus(pipe, currentResource == null ? STATUS_FINISHED : currentResource.getPath());
+ resolver.commit();
+ }
+ if (currentResource == null && distributor != null && StringUtils.isNotBlank(pipe.getDistributionAgent())) {
log.info("a distribution agent is configured, will try to distribute the changes");
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, true, paths.toArray(new String[paths.size()]));
DistributionResponse response = distributor.distribute(pipe.getDistributionAgent(), resolver, request);
@@ -142,4 +219,51 @@ public class PlumberImpl implements Plumber {
public void registerPipe(String type, Class<? extends BasePipe> pipeClass) {
registry.put(type, pipeClass);
}
+
+ /**
+ * writes the status of the pipe
+ * @param pipe
+ * @param status
+ */
+ protected void writeStatus(Pipe pipe, String status) throws RepositoryException {
+ if (StringUtils.isNotBlank(status)){
+ ModifiableValueMap vm = pipe.getResource().adaptTo(ModifiableValueMap.class);
+ vm.put(PN_STATUS, status);
+ Calendar cal = new GregorianCalendar();
+ cal.setTime(new Date());
+ vm.put(PN_STATUS_MODIFIED, cal);
+ }
+ }
+
+ @Override
+ public String getStatus(Resource pipeResource) {
+ Resource statusResource = pipeResource.getChild(PN_STATUS);
+ if (statusResource != null){
+ String status = statusResource.adaptTo(String.class);
+ if (StringUtils.isNotBlank(status)){
+ return status;
+ }
+ }
+ return STATUS_FINISHED;
+ }
+
+ @Override
+ public boolean isRunning(Resource pipeResource) {
+ return !getStatus(pipeResource).equals(STATUS_FINISHED);
+ }
+
+ @Override
+ public JobResult process(Job job) {
+ try(ResourceResolver resolver = factory.getServiceResourceResolver(serviceUser)){
+ String path = (String)job.getProperty(SlingConstants.PROPERTY_PATH);
+ Map bindings = (Map)job.getProperty(PipeBindings.NN_ADDITIONALBINDINGS);
+ execute(resolver, path, bindings, new NopWriter(), true);
+ return JobResult.OK;
+ } catch (LoginException e) {
+ log.error("unable to retrieve resolver for executing scheduled pipe", e);
+ } catch (Exception e) {
+ log.error("failed to execute the pipe", e);
+ }
+ return JobResult.FAILED;
+ }
}
diff --git a/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java b/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
index 60b7169..1672840 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
@@ -17,30 +17,21 @@
package org.apache.sling.pipes.internal;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.*;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringUtils;
import org.apache.sling.api.SlingHttpServletRequest;
import org.apache.sling.api.SlingHttpServletResponse;
-import org.apache.sling.api.resource.Resource;
-import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.servlets.SlingAllMethodsServlet;
import org.apache.sling.api.servlets.ServletResolverConstants;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.commons.json.JSONObject;
-import org.apache.sling.pipes.BasePipe;
-import org.apache.sling.pipes.ContainerPipe;
-import org.apache.sling.pipes.OutputWriter;
-import org.apache.sling.pipes.Pipe;
-import org.apache.sling.pipes.PipeBindings;
-import org.apache.sling.pipes.Plumber;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.pipes.*;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
@@ -50,7 +41,6 @@ import org.slf4j.LoggerFactory;
/**
* Servlet executing plumber for a pipe path given as 'path' parameter,
* it can also be launched against a container pipe resource directly (no need for path parameter)
- *
*/
@Component(service = {Servlet.class},
property= {
@@ -70,16 +60,18 @@ public class PlumberServlet extends SlingAllMethodsServlet {
protected static final String PARAM_BINDINGS = "bindings";
- protected static final String PARAM_SIZE = "size";
-
- public static final int NB_MAX = 10;
+ protected static final String PARAM_ASYNC = "async";
@Reference
Plumber plumber;
@Override
protected void doGet(SlingHttpServletRequest request, SlingHttpServletResponse response) throws ServletException, IOException {
- execute(request, response, false);
+ if (Arrays.asList(request.getRequestPathInfo().getSelectors()).contains(BasePipe.PN_STATUS)){
+ response.getWriter().append(plumber.getStatus(request.getResource()));
+ } else {
+ execute(request, response, false);
+ }
}
@Override
@@ -93,62 +85,60 @@ public class PlumberServlet extends SlingAllMethodsServlet {
if (StringUtils.isBlank(path)) {
throw new Exception("path should be provided");
}
- String dryRun = request.getParameter(BasePipe.DRYRUN_KEY);
- int size = request.getParameter(PARAM_SIZE) != null ? Integer.parseInt(request.getParameter(PARAM_SIZE)) : NB_MAX;
- if (size < 0) {
- size = Integer.MAX_VALUE;
- }
-
- ResourceResolver resolver = request.getResourceResolver();
- Resource pipeResource = resolver.getResource(path);
- Pipe pipe = plumber.getPipe(pipeResource);
- PipeBindings bindings = pipe.getBindings();
-
- if (StringUtils.isNotBlank(dryRun) && dryRun.equals(Boolean.TRUE.toString())) {
- bindings.addBinding(BasePipe.DRYRUN_KEY, true);
- }
-
- String paramBindings = request.getParameter(PARAM_BINDINGS);
- if (StringUtils.isNotBlank(paramBindings)){
- try {
- JSONObject bindingJSON = new JSONObject(paramBindings);
- for (Iterator<String> keys = bindingJSON.keys(); keys.hasNext();){
- String key = keys.next();
- bindings.addBinding(key, bindingJSON.get(key));
- }
- } catch (Exception e){
- log.error("Unable to retrieve bindings information", e);
+ Map bindings = getBindingsFromRequest(request, writeAllowed);
+ String asyncParam = request.getParameter(PARAM_ASYNC);
+ if (StringUtils.isNotBlank(asyncParam) && asyncParam.equals(Boolean.TRUE.toString())){
+ Job job = plumber.executeAsync(request.getResourceResolver(), path, bindings);
+ if (job != null){
+ response.getWriter().append("pipe execution registered as " + job.getId());
+ response.setStatus(HttpServletResponse.SC_CREATED);
+ } else {
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Some issue with your request, or server not being ready for async execution");
}
+ } else {
+ OutputWriter writer = getWriter(request, response);
+ plumber.execute(request.getResourceResolver(), path, bindings, writer, true);
}
- if (!writeAllowed && pipe.modifiesContent()) {
- throw new Exception("This pipe modifies content, you should use a POST request");
- }
- OutputWriter writer = getWriter(request, response, pipe);
- int i = 0;
- Iterator<Resource> resourceIterator = pipe.getOutput();
- Set<String> paths = new HashSet<String>();
- while (resourceIterator.hasNext()){
- Resource resource = resourceIterator.next();
- paths.add(resource.getPath());
- if (i++ < size) {
- writer.writeItem(resource);
- }
- }
- writer.ends(i);
- plumber.persist(resolver, pipe, paths);
} catch (Exception e) {
throw new ServletException(e);
}
}
- OutputWriter getWriter(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException {
+ /**
+ * Converts request into pipe bindings
+ * @param request
+ * @return
+ */
+ protected Map getBindingsFromRequest(SlingHttpServletRequest request, boolean writeAllowed){
+ Map bindings = new HashMap<>();
+ String dryRun = request.getParameter(BasePipe.DRYRUN_KEY);
+ if (StringUtils.isNotBlank(dryRun) && dryRun.equals(Boolean.TRUE.toString())) {
+ bindings.put(BasePipe.DRYRUN_KEY, true);
+ }
+ String paramBindings = request.getParameter(PARAM_BINDINGS);
+ if (StringUtils.isNotBlank(paramBindings)){
+ try {
+ JSONObject bindingJSON = new JSONObject(paramBindings);
+ for (Iterator<String> keys = bindingJSON.keys(); keys.hasNext();){
+ String key = keys.next();
+ bindings.put(key, bindingJSON.get(key));
+ }
+ } catch (Exception e){
+ log.error("Unable to retrieve bindings information", e);
+ }
+ }
+ bindings.put(BasePipe.READ_ONLY, !writeAllowed);
+ return bindings;
+ }
+
+ OutputWriter getWriter(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException {
OutputWriter[] candidates = new OutputWriter[]{new CustomJsonWriter(), new CustomWriter(), new DefaultOutputWriter()};
for (OutputWriter candidate : candidates) {
if (candidate.handleRequest(request)) {
- candidate.init(request, response, pipe);
+ candidate.init(request, response);
return candidate;
}
}
return null;
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/pipes/package-info.java b/src/main/java/org/apache/sling/pipes/package-info.java
index b5e84e4..44b5df9 100644
--- a/src/main/java/org/apache/sling/pipes/package-info.java
+++ b/src/main/java/org/apache/sling/pipes/package-info.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("0.0.10")
+@Version("1.0.0")
package org.apache.sling.pipes;
import org.osgi.annotation.versioning.Version;
diff --git a/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java b/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java
index 08e8047..a6a1afb 100644
--- a/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java
+++ b/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java
@@ -30,6 +30,8 @@ import org.junit.Rule;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* this abstract class for pipes implements a plumber with all registered pipes, plus some test ones, and give some paths,
@@ -53,7 +55,11 @@ public class AbstractPipeTest {
@Before
public void setup(){
PlumberImpl plumberImpl = new PlumberImpl();
- plumberImpl.activate();
+ PlumberImpl.Configuration configuration = mock(PlumberImpl.Configuration.class);
+ when(configuration.authorizedUsers()).thenReturn(new String[]{});
+ when(configuration.serviceUser()).thenReturn(null);
+ when(configuration.bufferSize()).thenReturn(PlumberImpl.DEFAULT_BUFFER_SIZE);
+ plumberImpl.activate(configuration);
plumberImpl.registerPipe("slingPipes/dummyNull", DummyNull.class);
plumberImpl.registerPipe("slingPipes/dummySearch", DummySearch.class);
plumber = plumberImpl;
diff --git a/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java b/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java
index 5c18229..23d6775 100644
--- a/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java
+++ b/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java
@@ -205,7 +205,7 @@ public class PlumberServletTest extends AbstractPipeTest {
when(request.getParameter(PlumberServlet.PARAM_BINDINGS)).thenReturn(bindings);
when(request.getParameter(CustomWriter.PARAM_WRITER)).thenReturn(writer);
when(request.getParameter(BasePipe.DRYRUN_KEY)).thenReturn(dryRun);
- when(request.getParameter(PlumberServlet.PARAM_SIZE)).thenReturn(size);
+ when(request.getParameter(OutputWriter.PARAM_SIZE)).thenReturn(size);
return request;
}
--
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.