You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by np...@apache.org on 2017/09/12 09:31:49 UTC

svn commit: r1808090 - in /sling/trunk/contrib/extensions/sling-pipes/src: main/java/org/apache/sling/pipes/ main/java/org/apache/sling/pipes/internal/ test/java/org/apache/sling/pipes/internal/ test/resources/

Author: npeltier
Date: Tue Sep 12 09:31:49 2017
New Revision: 1808090

URL: http://svn.apache.org/viewvc?rev=1808090&view=rev
Log:
SLING-7099 split JsonPipe and inputstream capabilities

Added:
    sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java
Modified:
    sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java
    sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
    sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java
    sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json

Added: sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java?rev=1808090&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java (added)
+++ sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/AbstractInputStreamPipe.java Tue Sep 12 09:31:49 2017
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.HttpState;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sling.api.resource.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
+/**
+ * Input Stream based pipe, coming from web, from request, resource tree, web
+ */
+public abstract class AbstractInputStreamPipe extends BasePipe {
+    private static Logger LOGGER = LoggerFactory.getLogger(AbstractInputStreamPipe.class);
+
+    public final String REMOTE_START = "http";
+
+    protected final Pattern VALID_PATH = Pattern.compile("/([\\w\\d]+/)+[\\w\\d]+");
+
+    public static final Object BINDING_IS = "org.apache.sling.pipes.RequestInputStream";
+
+    HttpClient client;
+
+    protected Object binding;
+
+    GetMethod method = null;
+
+    InputStream is;
+
+    public AbstractInputStreamPipe(Plumber plumber, Resource resource) throws Exception {
+        super(plumber, resource);
+        configureHttpClient();
+        binding = null;
+    }
+
+    /**
+     * Configure http client
+     */
+    private void configureHttpClient(){
+        HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
+        HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+        manager.setParams(params);
+        client = new HttpClient(manager);
+        client.getParams().setAuthenticationPreemptive(false);
+    }
+
+    InputStream getInputStream() throws IOException {
+        String expr = getExpr();
+        if (expr.startsWith(REMOTE_START)) {
+            //first look at
+            HttpState httpState = new HttpState();
+            String url = getExpr();
+            if (StringUtils.isNotBlank(url)) {
+                method = new GetMethod(url);
+                LOGGER.debug("Executing GET {}", url);
+                int status = client.executeMethod(null, method, httpState);
+                if (status == HttpStatus.SC_OK) {
+                    LOGGER.debug("200 received, streaming content");
+                    return method.getResponseBodyAsStream();
+                }
+            }
+        } else if (VALID_PATH.matcher(expr).find()
+                && resolver.getResource(expr) != null
+                && resolver.getResource(expr).adaptTo(File.class) != null) {
+            return new FileInputStream(resolver.getResource(expr).adaptTo(File.class));
+        } else if (getBindings().getBindings().get(BINDING_IS) != null){
+            return (InputStream)getBindings().getBindings().get(BINDING_IS);
+        }
+        return new ByteArrayInputStream(expr.getBytes(StandardCharsets.UTF_8));
+    }
+
+
+    @Override
+    public Object getOutputBinding() {
+        return binding;
+    }
+
+    abstract public Iterator<Resource> getOutput(InputStream inputStream);
+
+    @Override
+    public Iterator<Resource> getOutput() {
+        try {
+            is = getInputStream();
+            return getOutput(is);
+        } catch (Exception e){
+            LOGGER.error("unable to fecth input stream", e);
+        } finally {
+            IOUtils.closeQuietly(is);
+            if (method != null){
+                method.releaseConnection();
+            }
+        }
+        return EMPTY_ITERATOR;
+    }
+}

Modified: sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/JsonPipe.java Tue Sep 12 09:31:49 2017
@@ -17,7 +17,8 @@
 package org.apache.sling.pipes.internal;
 
 import java.io.InputStream;
-import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Iterator;
 
 import javax.json.JsonArray;
@@ -25,17 +26,10 @@ import javax.json.JsonException;
 import javax.json.JsonStructure;
 import javax.json.JsonValue.ValueType;
 
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.HttpState;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.resource.Resource;
-import org.apache.sling.pipes.BasePipe;
+import org.apache.sling.pipes.AbstractInputStreamPipe;
 import org.apache.sling.pipes.Plumber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,93 +37,30 @@ import org.slf4j.LoggerFactory;
 /**
  * Pipe outputting binding related to a json stream: either an object
  */
-public class JsonPipe extends BasePipe {
+public class JsonPipe extends AbstractInputStreamPipe {
     private static Logger logger = LoggerFactory.getLogger(JsonPipe.class);
     public static final String RESOURCE_TYPE = RT_PREFIX + "json";
 
-    HttpClient client;
-
     JsonArray array;
-    Object binding;
     int index = -1;
 
-    public final String REMOTE_START = "http";
-
     public JsonPipe(Plumber plumber, Resource resource) throws Exception {
         super(plumber, resource);
-        configureHttpClient();
-    }
-
-    /**
-     * Configure http client
-     */
-    private void configureHttpClient(){
-        HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
-        HttpConnectionManagerParams params = new HttpConnectionManagerParams();
-        manager.setParams(params);
-        client = new HttpClient(manager);
-        client.getParams().setAuthenticationPreemptive(false);
-    }
-
-    @Override
-    public Object getOutputBinding() {
-        return binding;
-    }
-
-    /**
-     * Retrieve remote / expression JSON String, or null if any problem occurs
-     * @return JSON serialization of the result
-     */
-    private String retrieveJSONString()  {
-        String json = null;
-        String expression = getExpr();
-        if (expression.startsWith(REMOTE_START)){
-            GetMethod method = null;
-            HttpState httpState = new HttpState();
-            InputStream responseInputStream = null;
-            try {
-                String url = getExpr();
-                if (StringUtils.isNotBlank(url)) {
-                    method = new GetMethod(url);
-                    logger.debug("Executing GET {}", url);
-                    int status = client.executeMethod(null, method, httpState);
-                    if (status == HttpStatus.SC_OK) {
-                        logger.debug("200 received, streaming content");
-                        responseInputStream = method.getResponseBodyAsStream();
-                        StringWriter writer = new StringWriter();
-                        IOUtils.copy(responseInputStream, writer, "utf-8");
-                        json = writer.toString();
-                    }
-                }
-            }
-            catch(Exception e){
-                logger.error("unable to retrieve the data", e);
-            }finally{
-                if (method != null) {
-                    method.releaseConnection();
-                }
-                IOUtils.closeQuietly(responseInputStream);
-            }
-        } else {
-            //other try: given expression *is* json
-            json = expression;
-        }
-        return json;
     }
 
-
     /**
      * in case there is no successful retrieval of some JSON data, we cut the pipe here
      * @return input resource of the pipe, can be reouputed N times in case output json binding is an array of
      * N element (output binding would be here each time the Nth element of the array)
      */
-    public Iterator<Resource> getOutput() {
+    public Iterator<Resource> getOutput(InputStream is) {
         Iterator<Resource> output = EMPTY_ITERATOR;
-        binding = null;
-        String jsonString = retrieveJSONString();
-        if (StringUtils.isNotBlank(jsonString)){
-            try {
-                JsonStructure json; 
+        Iterator<Resource> inputSingletonIterator = Collections.singleton(getInput()).iterator();
+        String jsonString = null;
+        try {
+            jsonString = IOUtils.toString(is, StandardCharsets.UTF_8);
+            if (StringUtils.isNotBlank(jsonString)) {
+                JsonStructure json;
                 try {
                     json = JsonUtil.parse(jsonString);
                 } catch (JsonException ex) {
@@ -137,13 +68,11 @@ public class JsonPipe extends BasePipe {
                 }
                 if (json == null) {
                     binding = jsonString.trim();
-                    output = super.getOutput();
-                } 
-                else if (json.getValueType() != ValueType.ARRAY) {
+                    output = inputSingletonIterator;
+                } else if (json.getValueType() != ValueType.ARRAY) {
                     binding = JsonUtil.unbox(json);
-                    output = super.getOutput();
-                }
-                else {
+                    output = inputSingletonIterator;
+                } else {
                     binding = array = (JsonArray) json;
                     index = 0;
                     output = new Iterator<Resource>() {
@@ -156,17 +85,17 @@ public class JsonPipe extends BasePipe {
                         public Resource next() {
                             try {
                                 binding = JsonUtil.unbox(array.get(index));
-                            } catch(Exception e){
+                            } catch (Exception e) {
                                 logger.error("Unable to retrieve {}nth item of jsonarray", index, e);
                             }
                             index++;
                             return getInput();
                         }
                     };
-                } 
-            } catch (JsonException e) {
-                logger.error("unable to parse JSON {} ", jsonString, e);
+                }
             }
+        }catch (Exception e) {
+            logger.error("unable to parse JSON {} ", jsonString, e);
         }
         return output;
     }

Modified: sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java Tue Sep 12 09:31:49 2017
@@ -32,6 +32,7 @@ import org.apache.sling.api.SlingHttpSer
 import org.apache.sling.api.servlets.ServletResolverConstants;
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
 import org.apache.sling.event.jobs.Job;
+import org.apache.sling.pipes.AbstractInputStreamPipe;
 import org.apache.sling.pipes.BasePipe;
 import org.apache.sling.pipes.ContainerPipe;
 import org.apache.sling.pipes.OutputWriter;
@@ -65,6 +66,8 @@ public class PlumberServlet extends Slin
 
     protected static final String PARAM_ASYNC = "async";
 
+    protected static final String PARAM_FILE = "pipes_inputFile";
+
     @Reference
     Plumber plumber;
 
@@ -119,8 +122,9 @@ public class PlumberServlet extends Slin
      * @param request from where to extract bindings
      * @param writeAllowed should we consider this execution is about to modify content
      * @return map of bindings
+     * @throws IOException in case something turns wrong with an input stream
      */
-    protected Map getBindingsFromRequest(SlingHttpServletRequest request, boolean writeAllowed){
+    protected Map getBindingsFromRequest(SlingHttpServletRequest request, boolean writeAllowed) throws IOException{
         Map bindings = new HashMap<>();
         String dryRun = request.getParameter(BasePipe.DRYRUN_KEY);
         if (StringUtils.isNotBlank(dryRun) && !dryRun.equals(Boolean.FALSE.toString())) {
@@ -134,6 +138,11 @@ public class PlumberServlet extends Slin
                 log.error("Unable to retrieve bindings information", e);
             }
         }
+
+        if (request.getParameterMap().containsValue(PARAM_FILE)){
+            bindings.put(AbstractInputStreamPipe.BINDING_IS, request.getRequestParameter(PARAM_FILE).getInputStream());
+        }
+
         bindings.put(BasePipe.READ_ONLY, !writeAllowed);
         return bindings;
     }

Modified: sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/JsonPipeTest.java Tue Sep 12 09:31:49 2017
@@ -22,9 +22,11 @@ import static org.junit.Assert.assertTru
 
 import java.util.Iterator;
 
+import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.pipes.AbstractPipeTest;
+import org.apache.sling.pipes.Pipe;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -32,8 +34,11 @@ import org.junit.Test;
  * testing json pipe with anonymous yahoo meteo API
  */
 public class JsonPipeTest extends AbstractPipeTest {
-    public static final String CONF = "/content/json/conf/weather";
-    public static final String ARRAY = "/content/json/conf/array";
+    public static final String CONTENT_JSON = "/content/json";
+    public static final String CONF = CONTENT_JSON + "/conf/weather";
+    public static final String ARRAY = CONTENT_JSON + "/conf/array";
+    public static final String JSON_DUMP = CONTENT_JSON + "/jsonDump";
+    public static final String CONTENT_ARRAY = CONTENT_JSON + "/array";
 
     @Before
     public void setup() {
@@ -52,9 +57,7 @@ public class JsonPipeTest extends Abstra
         assertTrue("There should be a Bucharest property", properties.containsKey("Bucharest"));
     }
 
-    @Test
-    public void testPipedArray() throws Exception {
-        Iterator<Resource> outputs = getOutput(ARRAY);
+    protected void testArray(Iterator<Resource> outputs){
         Resource first = outputs.next();
         Resource second = outputs.next();
         Resource third = outputs.next();
@@ -63,4 +66,9 @@ public class JsonPipeTest extends Abstra
         assertEquals("second resource should be two", "/content/json/array/two", second.getPath());
         assertEquals("third resource should be three", "/content/json/array/three", third.getPath());
     }
-}
+
+    @Test
+    public void testPipedArray() throws Exception {
+        testArray(getOutput(ARRAY));
+    }
+}
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json?rev=1808090&r1=1808089&r2=1808090&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json (original)
+++ sling/trunk/contrib/extensions/sling-pipes/src/test/resources/json.json Tue Sep 12 09:31:49 2017
@@ -48,11 +48,9 @@
         },
         "user": {
           "sling:resourceType": "slingPipes/base",
-          "path":"/content/json/array/${json.get('path')}"
+          "path":"/content/json/array/${json.path}"
         }
       }
-
-
     }
   },
   "windSpeed":{
@@ -60,14 +58,8 @@
   },
   "array": {
     "jcr:primaryType": "nt:unstructured",
-    "one": {
-      "jcr:primaryType": "nt:unstructured"
-    },
-    "two": {
-      "jcr:primaryType": "nt:unstructured"
-    },
-    "three": {
-      "jcr:primaryType": "nt:unstructured"
-    }
+    "one": { "jcr:primaryType": "nt:unstructured"},
+    "two": { "jcr:primaryType": "nt:unstructured"},
+    "three": { "jcr:primaryType": "nt:unstructured"}
   }
 }
\ No newline at end of file