You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2019/04/28 00:02:40 UTC

[nifi] branch master updated: NIFI-6145: Add Groovy file idioms (withInput/OutputStream, withReader/Writer) to ExecuteGroovyScript

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d02ab1  NIFI-6145: Add Groovy file idioms (withInput/OutputStream, withReader/Writer) to ExecuteGroovyScript
7d02ab1 is described below

commit 7d02ab1e2fc9a5a424b3cfe9cd4a872f5c5c97ae
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Apr 8 13:52:24 2019 -0400

    NIFI-6145: Add Groovy file idioms (withInput/OutputStream, withReader/Writer) to ExecuteGroovyScript
    
    This closes #3415
    
    Signed-off-by: Mike Thomsen <mi...@gmail.com>
---
 .../processors/groovyx/flow/GroovySessionFile.java | 33 ++++++++++++
 .../groovyx/ExecuteGroovyScriptTest.java           | 62 ++++++++++++++++++++++
 2 files changed, 95 insertions(+)

diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java
index 25ef2fb..831b0a4 100644
--- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/flow/GroovySessionFile.java
@@ -28,12 +28,15 @@ import groovy.lang.GroovyObject;
 import org.codehaus.groovy.runtime.InvokerHelper;
 
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.nio.charset.UnsupportedCharsetException;
 
 /**
  * SessionFile with groovy specific methods.
@@ -281,4 +284,34 @@ public class GroovySessionFile extends SessionFile implements GroovyObject {
         });
     }
 
+    public GroovySessionFile withInputStream(Closure c) throws IOException {
+        InputStream inStream = session.read(this);
+        c.call(inStream);
+        inStream.close();
+        return this;
+    }
+
+    public GroovySessionFile withOutputStream(Closure c) throws IOException {
+        OutputStream outStream = session.write(this);
+        c.call(outStream);
+        outStream.close();
+        return this;
+    }
+
+    public GroovySessionFile withReader(String charset, Closure c) throws IOException, UnsupportedCharsetException {
+        InputStream inStream = session.read(this);
+        BufferedReader br = new BufferedReader(new InputStreamReader(inStream, charset));
+        c.call(br);
+        br.close();
+        return this;
+    }
+
+    public GroovySessionFile withWriter(String charset, Closure c) throws IOException, UnsupportedCharsetException {
+        OutputStream outStream = session.write(this);
+        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outStream, charset));
+        c.call(bw);
+        bw.close();
+        return this;
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
index 8cce6d1..b2b3e2d 100644
--- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
@@ -365,6 +365,68 @@ public class ExecuteGroovyScriptTest {
         runner.assertAllFlowFilesTransferred(proc.REL_SUCCESS.getName(), 1);
     }
 
+    @Test
+    public void test_withInputStream() {
+        runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
+                + "def ff = session.get(); if(!ff)return;\n"
+                + "ff.withInputStream{inputStream -> String r = new BufferedReader(new InputStreamReader(inputStream)).lines()"
+                + ".collect(Collectors.joining(\"\\n\")); assert r=='1234' }; REL_SUCCESS << ff; ");
+
+        runner.assertValid();
+
+        runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
+    }
+
+    @Test
+    public void test_withOutputStream() {
+        runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
+                + "def ff = session.get(); if(!ff)return;\n"
+                + "ff.withOutputStream{outputStream -> outputStream.write('5678'.bytes)}; REL_SUCCESS << ff; ");
+
+        runner.assertValid();
+
+        runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("5678");
+    }
+
+    @Test
+    public void test_withReader() {
+        runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
+                + "def ff = session.get(); if(!ff)return;\n"
+                + "ff.withReader('UTF-8'){reader -> String r = new BufferedReader(reader).lines()"
+                + ".collect(Collectors.joining(\"\\n\")); assert r=='1234' }; REL_SUCCESS << ff; ");
+
+        runner.assertValid();
+
+        runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
+    }
+
+    @Test
+    public void test_withWriter() throws Exception {
+        runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "import java.util.stream.*\n"
+                + "def ff = session.get(); if(!ff)return;\n"
+                + "ff.withWriter('UTF-16LE'){writer -> writer.write('5678')}; REL_SUCCESS << ff; ");
+
+        runner.assertValid();
+
+        runner.enqueue("1234".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("5678".getBytes(StandardCharsets.UTF_16LE));
+    }
+
 
     private HashMap<String, String> map(String key, String value) {
         HashMap<String, String> attrs = new HashMap<>();