You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by np...@apache.org on 2017/09/12 09:31:49 UTC
svn commit: r1808090 - in /sling/trunk/contrib/extensions/sling-pipes/src:
main/java/org/apache/sling/pipes/ main/java/org/apache/sling/pipes/internal/
test/java/org/apache/sling/pipes/internal/ test/resources/
Author: npeltier
Date: Tue Sep 12 09:31:49 2017
New Revision: 1808090
URL: http://svn.apache.org/viewvc?rev=1808090&view=rev
Log:
SLING-7099 split JsonPipe and inputstream capabilities
Added:
sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java
Modified:
sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java
sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java
sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json
Added: sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java?rev=1808090&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java (added)
+++ sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java Tue Sep 12 09:31:49 2017
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.HttpState;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sling.api.resource.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
+/**
+ * Input Stream based pipe, coming from web, from request, resource tree, web
+ */
+public abstract class AbstractInputStreamPipe extends BasePipe {
+ private static Logger LOGGER = LoggerFactory.getLogger(AbstractInputStreamPipe.class);
+
+ public final String REMOTE_START = "http";
+
+ protected final Pattern VALID_PATH = Pattern.compile("/([\\w\\d]+/)+[\\w\\d]+");
+
+ public static final Object BINDING_IS = "org.apache.sling.pipes.RequestInputStream";
+
+ HttpClient client;
+
+ protected Object binding;
+
+ GetMethod method = null;
+
+ InputStream is;
+
+ public AbstractInputStreamPipe(Plumber plumber, Resource resource) throws Exception {
+ super(plumber, resource);
+ configureHttpClient();
+ binding = null;
+ }
+
+ /**
+ * Configure http client
+ */
+ private void configureHttpClient(){
+ HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
+ HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+ manager.setParams(params);
+ client = new HttpClient(manager);
+ client.getParams().setAuthenticationPreemptive(false);
+ }
+
+ InputStream getInputStream() throws IOException {
+ String expr = getExpr();
+ if (expr.startsWith(REMOTE_START)) {
+ //first look at
+ HttpState httpState = new HttpState();
+ String url = getExpr();
+ if (StringUtils.isNotBlank(url)) {
+ method = new GetMethod(url);
+ LOGGER.debug("Executing GET {}", url);
+ int status = client.executeMethod(null, method, httpState);
+ if (status == HttpStatus.SC_OK) {
+ LOGGER.debug("200 received, streaming content");
+ return method.getResponseBodyAsStream();
+ }
+ }
+ } else if (VALID_PATH.matcher(expr).find()
+ && resolver.getResource(expr) != null
+ && resolver.getResource(expr).adaptTo(File.class) != null) {
+ return new FileInputStream(resolver.getResource(expr).adaptTo(File.class));
+ } else if (getBindings().getBindings().get(BINDING_IS) != null){
+ return (InputStream)getBindings().getBindings().get(BINDING_IS);
+ }
+ return new ByteArrayInputStream(expr.getBytes(StandardCharsets.UTF_8));
+ }
+
+
+ @Override
+ public Object getOutputBinding() {
+ return binding;
+ }
+
+ abstract public Iterator<Resource> getOutput(InputStream inputStream);
+
+ @Override
+ public Iterator<Resource> getOutput() {
+ try {
+ is = getInputStream();
+ return getOutput(is);
+ } catch (Exception e){
+ LOGGER.error("unable to fecth input stream", e);
+ } finally {
+ IOUtils.closeQuietly(is);
+ if (method != null){
+ method.releaseConnection();
+ }
+ }
+ return EMPTY_ITERATOR;
+ }
+}
Modified: sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java Tue Sep 12 09:31:49 2017
@@ -17,7 +17,8 @@
package org.apache.sling.pipes.internal;
import java.io.InputStream;
-import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.Iterator;
import javax.json.JsonArray;
@@ -25,17 +26,10 @@ import javax.json.JsonException;
import javax.json.JsonStructure;
import javax.json.JsonValue.ValueType;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.HttpState;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.Resource;
-import org.apache.sling.pipes.BasePipe;
+import org.apache.sling.pipes.AbstractInputStreamPipe;
import org.apache.sling.pipes.Plumber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,93 +37,30 @@ import org.slf4j.LoggerFactory;
/**
* Pipe outputting binding related to a json stream: either an object
*/
-public class JsonPipe extends BasePipe {
+public class JsonPipe extends AbstractInputStreamPipe {
private static Logger logger = LoggerFactory.getLogger(JsonPipe.class);
public static final String RESOURCE_TYPE = RT_PREFIX + "json";
- HttpClient client;
-
JsonArray array;
- Object binding;
int index = -1;
- public final String REMOTE_START = "http";
-
public JsonPipe(Plumber plumber, Resource resource) throws Exception {
super(plumber, resource);
- configureHttpClient();
- }
-
- /**
- * Configure http client
- */
- private void configureHttpClient(){
- HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
- HttpConnectionManagerParams params = new HttpConnectionManagerParams();
- manager.setParams(params);
- client = new HttpClient(manager);
- client.getParams().setAuthenticationPreemptive(false);
- }
-
- @Override
- public Object getOutputBinding() {
- return binding;
- }
-
- /**
- * Retrieve remote / expression JSON String, or null if any problem occurs
- * @return JSON serialization of the result
- */
- private String retrieveJSONString() {
- String json = null;
- String expression = getExpr();
- if (expression.startsWith(REMOTE_START)){
- GetMethod method = null;
- HttpState httpState = new HttpState();
- InputStream responseInputStream = null;
- try {
- String url = getExpr();
- if (StringUtils.isNotBlank(url)) {
- method = new GetMethod(url);
- logger.debug("Executing GET {}", url);
- int status = client.executeMethod(null, method, httpState);
- if (status == HttpStatus.SC_OK) {
- logger.debug("200 received, streaming content");
- responseInputStream = method.getResponseBodyAsStream();
- StringWriter writer = new StringWriter();
- IOUtils.copy(responseInputStream, writer, "utf-8");
- json = writer.toString();
- }
- }
- }
- catch(Exception e){
- logger.error("unable to retrieve the data", e);
- }finally{
- if (method != null) {
- method.releaseConnection();
- }
- IOUtils.closeQuietly(responseInputStream);
- }
- } else {
- //other try: given expression *is* json
- json = expression;
- }
- return json;
}
-
/**
* in case there is no successful retrieval of some JSON data, we cut the pipe here
* @return input resource of the pipe, can be reouputed N times in case output json binding is an array of
* N element (output binding would be here each time the Nth element of the array)
*/
- public Iterator<Resource> getOutput() {
+ public Iterator<Resource> getOutput(InputStream is) {
Iterator<Resource> output = EMPTY_ITERATOR;
- binding = null;
- String jsonString = retrieveJSONString();
- if (StringUtils.isNotBlank(jsonString)){
- try {
- JsonStructure json;
+ Iterator<Resource> inputSingletonIterator = Collections.singleton(getInput()).iterator();
+ String jsonString = null;
+ try {
+ jsonString = IOUtils.toString(is, StandardCharsets.UTF_8);
+ if (StringUtils.isNotBlank(jsonString)) {
+ JsonStructure json;
try {
json = JsonUtil.parse(jsonString);
} catch (JsonException ex) {
@@ -137,13 +68,11 @@ public class JsonPipe extends BasePipe {
}
if (json == null) {
binding = jsonString.trim();
- output = super.getOutput();
- }
- else if (json.getValueType() != ValueType.ARRAY) {
+ output = inputSingletonIterator;
+ } else if (json.getValueType() != ValueType.ARRAY) {
binding = JsonUtil.unbox(json);
- output = super.getOutput();
- }
- else {
+ output = inputSingletonIterator;
+ } else {
binding = array = (JsonArray) json;
index = 0;
output = new Iterator<Resource>() {
@@ -156,17 +85,17 @@ public class JsonPipe extends BasePipe {
public Resource next() {
try {
binding = JsonUtil.unbox(array.get(index));
- } catch(Exception e){
+ } catch (Exception e) {
logger.error("Unable to retrieve {}nth item of jsonarray", index, e);
}
index++;
return getInput();
}
};
- }
- } catch (JsonException e) {
- logger.error("unable to parse JSON {} ", jsonString, e);
+ }
}
+ }catch (Exception e) {
+ logger.error("unable to parse JSON {} ", jsonString, e);
}
return output;
}
Modified: sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java Tue Sep 12 09:31:49 2017
@@ -32,6 +32,7 @@ import org.apache.sling.api.SlingHttpSer
import org.apache.sling.api.servlets.ServletResolverConstants;
import org.apache.sling.api.servlets.SlingAllMethodsServlet;
import org.apache.sling.event.jobs.Job;
+import org.apache.sling.pipes.AbstractInputStreamPipe;
import org.apache.sling.pipes.BasePipe;
import org.apache.sling.pipes.ContainerPipe;
import org.apache.sling.pipes.OutputWriter;
@@ -65,6 +66,8 @@ public class PlumberServlet extends Slin
protected static final String PARAM_ASYNC = "async";
+ protected static final String PARAM_FILE = "pipes_inputFile";
+
@Reference
Plumber plumber;
@@ -119,8 +122,9 @@ public class PlumberServlet extends Slin
* @param request from where to extract bindings
* @param writeAllowed should we consider this execution is about to modify content
* @return map of bindings
+ * @throws IOException in case something turns wrong with an input stream
*/
- protected Map getBindingsFromRequest(SlingHttpServletRequest request, boolean writeAllowed){
+ protected Map getBindingsFromRequest(SlingHttpServletRequest request, boolean writeAllowed) throws IOException{
Map bindings = new HashMap<>();
String dryRun = request.getParameter(BasePipe.DRYRUN_KEY);
if (StringUtils.isNotBlank(dryRun) && !dryRun.equals(Boolean.FALSE.toString())) {
@@ -134,6 +138,11 @@ public class PlumberServlet extends Slin
log.error("Unable to retrieve bindings information", e);
}
}
+
+ if (request.getParameterMap().containsValue(PARAM_FILE)){
+ bindings.put(AbstractInputStreamPipe.BINDING_IS, request.getRequestParameter(PARAM_FILE).getInputStream());
+ }
+
bindings.put(BasePipe.READ_ONLY, !writeAllowed);
return bindings;
}
Modified: sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java Tue Sep 12 09:31:49 2017
@@ -22,9 +22,11 @@ import static org.junit.Assert.assertTru
import java.util.Iterator;
+import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.pipes.AbstractPipeTest;
+import org.apache.sling.pipes.Pipe;
import org.junit.Before;
import org.junit.Test;
@@ -32,8 +34,11 @@ import org.junit.Test;
* testing json pipe with anonymous yahoo meteo API
*/
public class JsonPipeTest extends AbstractPipeTest {
- public static final String CONF = "/content/json/conf/weather";
- public static final String ARRAY = "/content/json/conf/array";
+ public static final String CONTENT_JSON = "/content/json";
+ public static final String CONF = CONTENT_JSON + "/conf/weather";
+ public static final String ARRAY = CONTENT_JSON + "/conf/array";
+ public static final String JSON_DUMP = CONTENT_JSON + "/jsonDump";
+ public static final String CONTENT_ARRAY = CONTENT_JSON + "/array";
@Before
public void setup() {
@@ -52,9 +57,7 @@ public class JsonPipeTest extends Abstra
assertTrue("There should be a Bucharest property", properties.containsKey("Bucharest"));
}
- @Test
- public void testPipedArray() throws Exception {
- Iterator<Resource> outputs = getOutput(ARRAY);
+ protected void testArray(Iterator<Resource> outputs){
Resource first = outputs.next();
Resource second = outputs.next();
Resource third = outputs.next();
@@ -63,4 +66,9 @@ public class JsonPipeTest extends Abstra
assertEquals("second resource should be two", "/content/json/array/two", second.getPath());
assertEquals("third resource should be three", "/content/json/array/three", third.getPath());
}
-}
+
+ @Test
+ public void testPipedArray() throws Exception {
+ testArray(getOutput(ARRAY));
+ }
+}
\ No newline at end of file
Modified: sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json Tue Sep 12 09:31:49 2017
@@ -48,11 +48,9 @@
},
"user": {
"sling:resourceType": "slingPipes/base",
- "path":"/content/json/array/${json.get('path')}"
+ "path":"/content/json/array/${json.path}"
}
}
-
-
}
},
"windSpeed":{
@@ -60,14 +58,8 @@
},
"array": {
"jcr:primaryType": "nt:unstructured",
- "one": {
- "jcr:primaryType": "nt:unstructured"
- },
- "two": {
- "jcr:primaryType": "nt:unstructured"
- },
- "three": {
- "jcr:primaryType": "nt:unstructured"
- }
+ "one": { "jcr:primaryType": "nt:unstructured"},
+ "two": { "jcr:primaryType": "nt:unstructured"},
+ "three": { "jcr:primaryType": "nt:unstructured"}
}
}
\ No newline at end of file