You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/11/23 21:46:36 UTC

[25/50] [abbrv] nifi git commit: NIFI-1081 Adding option to ExecuteStreamCommand to put output value to an attribute

NIFI-1081 Adding option to ExecuteStreamCommand to put output value to an attribute

Reviewed and amended (comments,whitespace,and some code readability (discussed in ticket)) by Tony Kurc (tkurc@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e5cc070
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e5cc070
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e5cc070

Branch: refs/heads/NIFI-655
Commit: 1e5cc070a3d29736beea9af0b2d684a9bdcfff8e
Parents: 9e2f6df
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Wed Nov 18 21:16:12 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 18 23:23:10 2015 -0500

----------------------------------------------------------------------
 NOTICE                                          |   3 +
 nifi-assembly/NOTICE                            |   4 +
 .../standard/ExecuteStreamCommand.java          | 200 ++++++++++++----
 .../SoftLimitBoundedByteArrayOutputStream.java  |  99 ++++++++
 .../standard/TestExecuteStreamCommand.java      | 237 ++++++++++++++++++-
 5 files changed, 488 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 9686fba..f39a045 100644
--- a/NOTICE
+++ b/NOTICE
@@ -4,3 +4,6 @@ Copyright 2014-2015 The Apache Software Foundation
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
 
+This product includes the following work from the Apache Hadoop project:
+
+BoundedByteArrayOutputStream.java adapted to SoftLimitBoundedByteArrayOutputStream.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 5e18035..6ec5c31 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -4,6 +4,10 @@ Copyright 2014-2015 The Apache Software Foundation
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
 
+This product includes the following work from the Apache Hadoop project:
+
+BoundedByteArrayOutputStream.java which was adapted to SoftLimitBoundedByteArrayOutputStream.java
+
 ===========================================
 Apache Software License v2
 ===========================================

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 9bea6ba..38c8bd4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -59,6 +60,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.ArgumentUtils;
+import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -126,7 +128,7 @@ import org.apache.nifi.stream.io.StreamUtils;
 @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
 @DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor")
 @WritesAttributes({
-    @WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"),
+    @WritesAttribute(attribute = "execution.command", description = "The name of the command executed"),
     @WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
     @WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"),
     @WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")})
@@ -140,14 +142,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
             .name("output stream")
             .description("The destination path for the flow file created from the command's output")
             .build();
-    private static final Set<Relationship> RELATIONSHIPS;
+    private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
 
-    static {
-        Set<Relationship> rels = new HashSet<>();
-        rels.add(OUTPUT_STREAM_RELATIONSHIP);
-        rels.add(ORIGINAL_RELATIONSHIP);
-        RELATIONSHIPS = Collections.unmodifiableSet(rels);
-    }
+    private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
+    private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
 
     private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true);
     static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder()
@@ -195,6 +193,22 @@ public class ExecuteStreamCommand extends AbstractProcessor {
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("Output Destination Attribute")
+            .description("If set, the output of the stream command will be put into an attribute of the original FlowFile instead of a separate "
+                    + "FlowFile. There will no longer be a relationship for 'output stream'. The value of this property will be the key for the output attribute.")
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder()
+            .name("Max Attribute Length")
+            .description("If routing the output of the stream command to an attribute, the number of characters put to the attribute value "
+                    + "will be at most this amount. This is important because attributes are held in memory and large attributes will quickly "
+                    + "cause out of memory issues. If the output goes longer than this value, it will truncated to fit. Consider making this smaller if able.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("256")
+            .build();
+
     private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1);
 
     static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder()
@@ -216,19 +230,44 @@ public class ExecuteStreamCommand extends AbstractProcessor {
         props.add(IGNORE_STDIN);
         props.add(WORKING_DIR);
         props.add(ARG_DELIMITER);
+        props.add(PUT_OUTPUT_IN_ATTRIBUTE);
+        props.add(PUT_ATTRIBUTE_MAX_LENGTH);
         PROPERTIES = Collections.unmodifiableList(props);
+
+
+        Set<Relationship> outputStreamRelationships = new HashSet<>();
+        outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP);
+        outputStreamRelationships.add(ORIGINAL_RELATIONSHIP);
+        OUTPUT_STREAM_RELATIONSHIP_SET = Collections.unmodifiableSet(outputStreamRelationships);
+
+        Set<Relationship> attributeRelationships = new HashSet<>();
+        attributeRelationships.add(ORIGINAL_RELATIONSHIP);
+        ATTRIBUTE_RELATIONSHIP_SET = Collections.unmodifiableSet(attributeRelationships);
     }
 
     private ProcessorLog logger;
 
     @Override
     public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
+        return relationships.get();
     }
 
     @Override
     protected void init(ProcessorInitializationContext context) {
         logger = getLogger();
+
+        relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET);
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (descriptor.equals(PUT_OUTPUT_IN_ATTRIBUTE)) {
+            if (newValue != null) {
+                relationships.set(ATTRIBUTE_RELATIONSHIP_SET);
+            } else {
+                relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET);
+            }
+        }
     }
 
     @Override
@@ -254,6 +293,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
         }
 
         final ArrayList<String> args = new ArrayList<>();
+        final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
+        final Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
+        final String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue();
+
         final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
         args.add(executeCommand);
         final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
@@ -300,10 +343,17 @@ public class ExecuteStreamCommand extends AbstractProcessor {
                 final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
             int exitCode = -1;
             final BufferedOutputStream bos = new BufferedOutputStream(pos);
-            FlowFile outputStreamFlowFile = session.create(inputFlowFile);
-            ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, outputStreamFlowFile, process);
+            FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
+
+            ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger,
+                    attributeName, session, outputFlowFile, process,putToAttribute,attributeSize);
             session.read(inputFlowFile, callback);
-            outputStreamFlowFile = callback.outputStreamFlowFile;
+
+            outputFlowFile = callback.outputFlowFile;
+            if (putToAttribute) {
+                outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size));
+            }
+
             exitCode = callback.exitCode;
             logger.debug("Execution complete for command: {}.  Exited with code: {}", new Object[]{executeCommand, exitCode});
 
@@ -321,21 +371,29 @@ public class ExecuteStreamCommand extends AbstractProcessor {
             int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
             attributes.put("execution.error", strBldr.substring(0, length));
 
+            final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
             if (exitCode == 0) {
-                logger.info("Transferring flow file {} to 'output stream'", new Object[]{outputStreamFlowFile});
+                logger.info("Transferring flow file {} to {}",
+                        new Object[]{outputFlowFile,outputFlowFileRelationship.getName()});
             } else {
-                logger.error("Transferring flow file {} to 'output stream'. Executable command {} ended in an error: {}",
-                        new Object[]{outputStreamFlowFile, executeCommand, strBldr.toString()});
+                logger.error("Transferring flow file {} to {}. Executable command {} ended in an error: {}",
+                        new Object[]{outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString()});
             }
 
             attributes.put("execution.status", Integer.toString(exitCode));
             attributes.put("execution.command", executeCommand);
             attributes.put("execution.command.args", commandArguments);
-            outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes);
-            session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP);
-            logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
-            inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
-            session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
+            outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
+
+            // This transfer will transfer the FlowFile that received the stream out put to it's destined relationship.
+            // In the event the stream is put to the an attribute of the original, it will be transferred here.
+            session.transfer(outputFlowFile, outputFlowFileRelationship);
+
+            if (!putToAttribute) {
+                logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
+                inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
+                session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
+            }
 
         } catch (final IOException ex) {
             // could not close Process related streams
@@ -348,59 +406,97 @@ public class ExecuteStreamCommand extends AbstractProcessor {
     static class ProcessStreamWriterCallback implements InputStreamCallback {
 
         final boolean ignoreStdin;
-        final OutputStream stdInWritable;
-        final InputStream stdOutReadable;
+        final OutputStream stdinWritable;
+        final InputStream stdoutReadable;
         final ProcessorLog logger;
         final ProcessSession session;
         final Process process;
-        FlowFile outputStreamFlowFile;
+        FlowFile outputFlowFile;
         int exitCode;
+        final boolean putToAttribute;
+        final int attributeSize;
+        final String attributeName;
+
+        byte[] outputBuffer;
+        int size;
 
-        public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdInWritable, InputStream stdOutReadable,
-                                           ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) {
+        public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdinWritable, InputStream stdoutReadable,ProcessorLog logger, String attributeName,
+                                           ProcessSession session, FlowFile outputFlowFile, Process process, boolean putToAttribute, int attributeSize) {
             this.ignoreStdin = ignoreStdin;
-            this.stdInWritable = stdInWritable;
-            this.stdOutReadable = stdOutReadable;
+            this.stdinWritable = stdinWritable;
+            this.stdoutReadable = stdoutReadable;
             this.logger = logger;
             this.session = session;
-            this.outputStreamFlowFile = outputStreamFlowFile;
+            this.outputFlowFile = outputFlowFile;
             this.process = process;
+            this.putToAttribute = putToAttribute;
+            this.attributeSize = attributeSize;
+            this.attributeName = attributeName;
         }
 
         @Override
         public void process(final InputStream incomingFlowFileIS) throws IOException {
-            outputStreamFlowFile = session.write(outputStreamFlowFile, new OutputStreamCallback() {
+            if (putToAttribute) {
+                try (SoftLimitBoundedByteArrayOutputStream softLimitBoundedBAOS = new SoftLimitBoundedByteArrayOutputStream(attributeSize)) {
+                    readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
+                    final long longSize = StreamUtils.copy(stdoutReadable, softLimitBoundedBAOS);
+
+                    // Because the outputstream has a cap that the copy doesn't know about, adjust
+                    // the actual size
+                    if (longSize > (long) attributeSize) { // Explicit cast for readability
+                        size = attributeSize;
+                    } else{
+                        size = (int) longSize; // Note: safe cast, longSize is limited by attributeSize
+                    }
+
+                    outputBuffer = softLimitBoundedBAOS.getBuffer();
+                    stdoutReadable.close();
 
-                @Override
-                public void process(OutputStream out) throws IOException {
-
-                    Thread writerThread = new Thread(new Runnable() {
-
-                        @Override
-                        public void run() {
-                            if (!ignoreStdin) {
-                                try {
-                                    StreamUtils.copy(incomingFlowFileIS, stdInWritable);
-                                } catch (IOException e) {
-                                    logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
-                                }
-                            }
-                            // MUST close the output stream to the stdIn so that whatever is reading knows
-                            // there is no more data
-                            IOUtils.closeQuietly(stdInWritable);
-                        }
-                    });
-                    writerThread.setDaemon(true);
-                    writerThread.start();
-                    StreamUtils.copy(stdOutReadable, out);
                     try {
                         exitCode = process.waitFor();
                     } catch (InterruptedException e) {
                         logger.warn("Command Execution Process was interrupted", e);
                     }
                 }
-            });
+            } else {
+                outputFlowFile = session.write(outputFlowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(OutputStream out) throws IOException {
+
+                        readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
+                        StreamUtils.copy(stdoutReadable, out);
+                        try {
+                            exitCode = process.waitFor();
+                        } catch (InterruptedException e) {
+                            logger.warn("Command Execution Process was interrupted", e);
+                        }
+                    }
+                });
+            }
         }
     }
 
+    private static void readStdoutReadable(final boolean ignoreStdin, final OutputStream stdinWritable,
+                                           final ProcessorLog logger, final InputStream incomingFlowFileIS) throws IOException {
+        Thread writerThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                if (!ignoreStdin) {
+                    try {
+                        StreamUtils.copy(incomingFlowFileIS, stdinWritable);
+                    } catch (IOException e) {
+                        // This is unlikely to occur, and isn't handled at the moment
+                        // Bug captured in NIFI-1194
+                        logger.error("Failed to write flow file to stdin due to {}", new Object[]{e}, e);
+                    }
+                }
+                // MUST close the output stream to the stdin so that whatever is reading knows
+                // there is no more data.
+                IOUtils.closeQuietly(stdinWritable);
+            }
+        });
+        writerThread.setDaemon(true);
+        writerThread.start();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java
new file mode 100644
index 0000000..95e9a72
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SoftLimitBoundedByteArrayOutputStream extends OutputStream {
+    /*
+     * This Bounded Array Output Stream (BAOS) allows the user to write to the output stream up to a specified limit.
+     * Higher than that limit the BAOS will silently return and not put more into the buffer. It also will not throw an error.
+     * This effectively truncates the stream for the user to fit into a bounded array.
+     */
+
+    private final byte[] buffer;
+    private int limit;
+    private int count;
+
+    public SoftLimitBoundedByteArrayOutputStream(int capacity) {
+        this(capacity, capacity);
+    }
+
+    public SoftLimitBoundedByteArrayOutputStream(int capacity, int limit) {
+        if ((capacity < limit) || (capacity | limit) < 0) {
+            throw new IllegalArgumentException("Invalid capacity/limit");
+        }
+        this.buffer = new byte[capacity];
+        this.limit = limit;
+        this.count = 0;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (count >= limit) {
+            return;
+        }
+        buffer[count++] = (byte) b;
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+                || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return;
+        }
+
+        if (count + len > limit) {
+            len = limit-count;
+            if(len == 0){
+                return;
+            }
+        }
+
+        System.arraycopy(b, off, buffer, count, len);
+        count += len;
+    }
+
+    public void reset(int newlim) {
+        if (newlim > buffer.length) {
+            throw new IndexOutOfBoundsException("Limit exceeds buffer size");
+        }
+        this.limit = newlim;
+        this.count = 0;
+    }
+
+    public void reset() {
+        this.limit = buffer.length;
+        this.count = 0;
+    }
+
+    public int getLimit() {
+        return limit;
+    }
+
+    public byte[] getBuffer() {
+        return buffer;
+    }
+
+    public int size() {
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
index 0f13ba2..44576d4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
@@ -38,9 +38,6 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-/**
- *
- */
 public class TestExecuteStreamCommand {
     @BeforeClass
     public static void init() {
@@ -232,6 +229,237 @@ public class TestExecuteStreamCommand {
     }
 
     @Test
+    public void testSmallEchoPutToAttribute() throws Exception {
+        File dummy = new File("src/test/resources/hello.txt");
+        assertTrue(dummy.exists());
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue("".getBytes());
+
+        if(isWindows()) {
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe");
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;echo Hello");
+            controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, ";");
+        } else{
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo");
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "Hello");
+        }
+        controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+        MockFlowFile outputFlowFile = flowFiles.get(0);
+        outputFlowFile.assertContentEquals("");
+        String ouput = outputFlowFile.getAttribute("executeStreamCommand.output");
+        assertTrue(ouput.startsWith("Hello"));
+        assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+        assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command"));
+    }
+
+    @Test
+    public void testExecuteJarPutToAttribute() throws Exception {
+        File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
+        File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue(dummy.toPath());
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+        MockFlowFile outputFlowFile = flowFiles.get(0);
+        String result = outputFlowFile.getAttribute("executeStreamCommand.output");
+        outputFlowFile.assertContentEquals(dummy);
+        assertTrue(Pattern.compile("Test was a success\r?\n").matcher(result).find());
+        assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+        assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+        assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5));
+        String attribute = outputFlowFile.getAttribute("execution.command.args");
+        String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
+        assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
+    }
+
+    @Test
+    public void testExecuteJarToAttributeConfiguration() throws Exception {
+        File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue("small test".getBytes());
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+        controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "10");
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest");
+        assertEquals(1, controller.getProcessContext().getAvailableRelationships().size());
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+        MockFlowFile outputFlowFile = flowFiles.get(0);
+        outputFlowFile.assertContentEquals("small test".getBytes());
+        String result = outputFlowFile.getAttribute("outputDest");
+        assertTrue(Pattern.compile("Test was a").matcher(result).find());
+        assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+        assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+        assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5));
+        String attribute = outputFlowFile.getAttribute("execution.command.args");
+        String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
+        assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
+    }
+
+    @Test
+    public void testExecuteIngestAndUpdatePutToAttribute() throws IOException {
+        File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+        File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+        File dummy100MBytes = new File("target/100MB.txt");
+        FileInputStream fis = new FileInputStream(dummy);
+        FileOutputStream fos = new FileOutputStream(dummy100MBytes);
+        byte[] bytes = new byte[1024];
+        assertEquals(1000, fis.read(bytes));
+        fis.close();
+        for (int i = 0; i < 100000; i++) {
+            fos.write(bytes, 0, 1000);
+        }
+        fos.close();
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue(dummy100MBytes.toPath());
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest");
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+        String result = flowFiles.get(0).getAttribute("outputDest");
+
+        assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
+    }
+
+    @Test
+    public void testLargePutToAttribute() throws IOException {
+        File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+        File dummy100MBytes = new File("target/100MB.txt");
+        FileInputStream fis = new FileInputStream(dummy);
+        FileOutputStream fos = new FileOutputStream(dummy100MBytes);
+        byte[] bytes = new byte[1024];
+        assertEquals(1000, fis.read(bytes));
+        fis.close();
+        for (int i = 0; i < 100000; i++) {
+            fos.write(bytes, 0, 1000);
+        }
+        fos.close();
+
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue("".getBytes());
+        if(isWindows()) {
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe");
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;type " + dummy100MBytes.getAbsolutePath());
+            controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, ";");
+        } else{
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cat");
+            controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, dummy100MBytes.getAbsolutePath());
+        }
+        controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+        controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "256");
+
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+
+        flowFiles.get(0).assertAttributeEquals("execution.status", "0");
+        String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
+        assertTrue(Pattern.compile("a{256}").matcher(result).matches());
+    }
+
+    @Test
+    public void testExecuteIngestAndUpdateWithWorkingDirPutToAttribute() throws IOException {
+        File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+        File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+
+        controller.enqueue(dummy.toPath());
+        controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "streamOutput");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+        String result = flowFiles.get(0).getAttribute("streamOutput");
+
+        final String quotedSeparator = Pattern.quote(File.separator);
+        assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find());
+    }
+
+    @Test
+    public void testIgnoredStdinPutToAttribute() throws IOException {
+        File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+        File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue(dummy.toPath());
+        controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+        controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+        String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
+        assertTrue("TestIngestAndUpdate.jar should not have received anything to modify",
+                Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find());
+    }
+
+    @Test
+    public void testDynamicEnvironmentPutToAttribute() throws Exception {
+        File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
+        File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+        String jarPath = exJar.getAbsolutePath();
+        exJar.setExecutable(true);
+        final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+        controller.setProperty("NIFI_TEST_1", "testvalue1");
+        controller.setProperty("NIFI_TEST_2", "testvalue2");
+        controller.setValidateExpressionUsage(false);
+        controller.enqueue(dummy.toPath());
+        controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+        controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+        controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+        controller.run(1);
+        controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+        List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+        String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
+        Set<String> dynamicEnvironmentVariables = new HashSet<>(Arrays.asList(result.split("\r?\n")));
+        assertFalse("Should contain at least two environment variables starting with NIFI", dynamicEnvironmentVariables.size() < 2);
+        assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1"));
+        assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
+    }
+
+    @Test
     public void testQuotedArguments() throws Exception {
         List<String> args = ArgumentUtils.splitArgs("echo -n \"arg1 arg2 arg3\"", ' ');
         assertEquals(3, args.size());
@@ -250,4 +478,7 @@ public class TestExecuteStreamCommand {
         controller.assertValid();
     }
 
+    private static boolean isWindows() {
+        return System.getProperty("os.name").toLowerCase().startsWith("windows");
+    }
 }