You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/19 05:23:55 UTC
nifi git commit: NIFI-1081 Adding option to ExecuteStreamCommand to
put output value to an attribute
Repository: nifi
Updated Branches:
refs/heads/master 9e2f6df20 -> 1e5cc070a
NIFI-1081 Adding option to ExecuteStreamCommand to put output value to an attribute
Reviewed and amended (comments,whitespace,and some code readability (discussed in ticket)) by Tony Kurc (tkurc@apache.org)
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e5cc070
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e5cc070
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e5cc070
Branch: refs/heads/master
Commit: 1e5cc070a3d29736beea9af0b2d684a9bdcfff8e
Parents: 9e2f6df
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Wed Nov 18 21:16:12 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 18 23:23:10 2015 -0500
----------------------------------------------------------------------
NOTICE | 3 +
nifi-assembly/NOTICE | 4 +
.../standard/ExecuteStreamCommand.java | 200 ++++++++++++----
.../SoftLimitBoundedByteArrayOutputStream.java | 99 ++++++++
.../standard/TestExecuteStreamCommand.java | 237 ++++++++++++++++++-
5 files changed, 488 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 9686fba..f39a045 100644
--- a/NOTICE
+++ b/NOTICE
@@ -4,3 +4,6 @@ Copyright 2014-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+This product includes the following work from the Apache Hadoop project:
+
+BoundedByteArrayOutputStream.java adapted to SoftLimitBoundedByteArrayOutputStream.java
http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 5e18035..6ec5c31 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -4,6 +4,10 @@ Copyright 2014-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+This product includes the following work from the Apache Hadoop project:
+
+BoundedByteArrayOutputStream.java which was adapted to SoftLimitBoundedByteArrayOutputStream.java
+
===========================================
Apache Software License v2
===========================================
http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 9bea6ba..38c8bd4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@@ -59,6 +60,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
+import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
@@ -126,7 +128,7 @@ import org.apache.nifi.stream.io.StreamUtils;
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor")
@WritesAttributes({
- @WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"),
+ @WritesAttribute(attribute = "execution.command", description = "The name of the command executed"),
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
@WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"),
@WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")})
@@ -140,14 +142,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.name("output stream")
.description("The destination path for the flow file created from the command's output")
.build();
- private static final Set<Relationship> RELATIONSHIPS;
+ private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
- static {
- Set<Relationship> rels = new HashSet<>();
- rels.add(OUTPUT_STREAM_RELATIONSHIP);
- rels.add(ORIGINAL_RELATIONSHIP);
- RELATIONSHIPS = Collections.unmodifiableSet(rels);
- }
+ private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
+ private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true);
static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder()
@@ -195,6 +193,22 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.defaultValue("false")
.build();
+ static final PropertyDescriptor PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
+ .name("Output Destination Attribute")
+ .description("If set, the output of the stream command will be put into an attribute of the original FlowFile instead of a separate "
+ + "FlowFile. There will no longer be a relationship for 'output stream'. The value of this property will be the key for the output attribute.")
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder()
+ .name("Max Attribute Length")
+ .description("If routing the output of the stream command to an attribute, the number of characters put to the attribute value "
+ + "will be at most this amount. This is important because attributes are held in memory and large attributes will quickly "
+ + "cause out of memory issues. If the output goes longer than this value, it will truncated to fit. Consider making this smaller if able.")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("256")
+ .build();
+
private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1);
static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder()
@@ -216,19 +230,44 @@ public class ExecuteStreamCommand extends AbstractProcessor {
props.add(IGNORE_STDIN);
props.add(WORKING_DIR);
props.add(ARG_DELIMITER);
+ props.add(PUT_OUTPUT_IN_ATTRIBUTE);
+ props.add(PUT_ATTRIBUTE_MAX_LENGTH);
PROPERTIES = Collections.unmodifiableList(props);
+
+
+ Set<Relationship> outputStreamRelationships = new HashSet<>();
+ outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP);
+ outputStreamRelationships.add(ORIGINAL_RELATIONSHIP);
+ OUTPUT_STREAM_RELATIONSHIP_SET = Collections.unmodifiableSet(outputStreamRelationships);
+
+ Set<Relationship> attributeRelationships = new HashSet<>();
+ attributeRelationships.add(ORIGINAL_RELATIONSHIP);
+ ATTRIBUTE_RELATIONSHIP_SET = Collections.unmodifiableSet(attributeRelationships);
}
private ProcessorLog logger;
@Override
public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
+ return relationships.get();
}
@Override
protected void init(ProcessorInitializationContext context) {
logger = getLogger();
+
+ relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET);
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (descriptor.equals(PUT_OUTPUT_IN_ATTRIBUTE)) {
+ if (newValue != null) {
+ relationships.set(ATTRIBUTE_RELATIONSHIP_SET);
+ } else {
+ relationships.set(OUTPUT_STREAM_RELATIONSHIP_SET);
+ }
+ }
}
@Override
@@ -254,6 +293,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
final ArrayList<String> args = new ArrayList<>();
+ final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
+ final Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
+ final String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue();
+
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
args.add(executeCommand);
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
@@ -300,10 +343,17 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
int exitCode = -1;
final BufferedOutputStream bos = new BufferedOutputStream(pos);
- FlowFile outputStreamFlowFile = session.create(inputFlowFile);
- ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, outputStreamFlowFile, process);
+ FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
+
+ ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger,
+ attributeName, session, outputFlowFile, process,putToAttribute,attributeSize);
session.read(inputFlowFile, callback);
- outputStreamFlowFile = callback.outputStreamFlowFile;
+
+ outputFlowFile = callback.outputFlowFile;
+ if (putToAttribute) {
+ outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size));
+ }
+
exitCode = callback.exitCode;
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
@@ -321,21 +371,29 @@ public class ExecuteStreamCommand extends AbstractProcessor {
int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
attributes.put("execution.error", strBldr.substring(0, length));
+ final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
if (exitCode == 0) {
- logger.info("Transferring flow file {} to 'output stream'", new Object[]{outputStreamFlowFile});
+ logger.info("Transferring flow file {} to {}",
+ new Object[]{outputFlowFile,outputFlowFileRelationship.getName()});
} else {
- logger.error("Transferring flow file {} to 'output stream'. Executable command {} ended in an error: {}",
- new Object[]{outputStreamFlowFile, executeCommand, strBldr.toString()});
+ logger.error("Transferring flow file {} to {}. Executable command {} ended in an error: {}",
+ new Object[]{outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString()});
}
attributes.put("execution.status", Integer.toString(exitCode));
attributes.put("execution.command", executeCommand);
attributes.put("execution.command.args", commandArguments);
- outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes);
- session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP);
- logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
- inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
- session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
+ outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
+
+ // This transfer will transfer the FlowFile that received the stream out put to it's destined relationship.
+ // In the event the stream is put to the an attribute of the original, it will be transferred here.
+ session.transfer(outputFlowFile, outputFlowFileRelationship);
+
+ if (!putToAttribute) {
+ logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
+ inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
+ session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
+ }
} catch (final IOException ex) {
// could not close Process related streams
@@ -348,59 +406,97 @@ public class ExecuteStreamCommand extends AbstractProcessor {
static class ProcessStreamWriterCallback implements InputStreamCallback {
final boolean ignoreStdin;
- final OutputStream stdInWritable;
- final InputStream stdOutReadable;
+ final OutputStream stdinWritable;
+ final InputStream stdoutReadable;
final ProcessorLog logger;
final ProcessSession session;
final Process process;
- FlowFile outputStreamFlowFile;
+ FlowFile outputFlowFile;
int exitCode;
+ final boolean putToAttribute;
+ final int attributeSize;
+ final String attributeName;
+
+ byte[] outputBuffer;
+ int size;
- public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdInWritable, InputStream stdOutReadable,
- ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) {
+ public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdinWritable, InputStream stdoutReadable,ProcessorLog logger, String attributeName,
+ ProcessSession session, FlowFile outputFlowFile, Process process, boolean putToAttribute, int attributeSize) {
this.ignoreStdin = ignoreStdin;
- this.stdInWritable = stdInWritable;
- this.stdOutReadable = stdOutReadable;
+ this.stdinWritable = stdinWritable;
+ this.stdoutReadable = stdoutReadable;
this.logger = logger;
this.session = session;
- this.outputStreamFlowFile = outputStreamFlowFile;
+ this.outputFlowFile = outputFlowFile;
this.process = process;
+ this.putToAttribute = putToAttribute;
+ this.attributeSize = attributeSize;
+ this.attributeName = attributeName;
}
@Override
public void process(final InputStream incomingFlowFileIS) throws IOException {
- outputStreamFlowFile = session.write(outputStreamFlowFile, new OutputStreamCallback() {
+ if (putToAttribute) {
+ try (SoftLimitBoundedByteArrayOutputStream softLimitBoundedBAOS = new SoftLimitBoundedByteArrayOutputStream(attributeSize)) {
+ readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
+ final long longSize = StreamUtils.copy(stdoutReadable, softLimitBoundedBAOS);
+
+ // Because the outputstream has a cap that the copy doesn't know about, adjust
+ // the actual size
+ if (longSize > (long) attributeSize) { // Explicit cast for readability
+ size = attributeSize;
+ } else{
+ size = (int) longSize; // Note: safe cast, longSize is limited by attributeSize
+ }
+
+ outputBuffer = softLimitBoundedBAOS.getBuffer();
+ stdoutReadable.close();
- @Override
- public void process(OutputStream out) throws IOException {
-
- Thread writerThread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- if (!ignoreStdin) {
- try {
- StreamUtils.copy(incomingFlowFileIS, stdInWritable);
- } catch (IOException e) {
- logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
- }
- }
- // MUST close the output stream to the stdIn so that whatever is reading knows
- // there is no more data
- IOUtils.closeQuietly(stdInWritable);
- }
- });
- writerThread.setDaemon(true);
- writerThread.start();
- StreamUtils.copy(stdOutReadable, out);
try {
exitCode = process.waitFor();
} catch (InterruptedException e) {
logger.warn("Command Execution Process was interrupted", e);
}
}
- });
+ } else {
+ outputFlowFile = session.write(outputFlowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+
+ readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
+ StreamUtils.copy(stdoutReadable, out);
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.warn("Command Execution Process was interrupted", e);
+ }
+ }
+ });
+ }
}
}
+ private static void readStdoutReadable(final boolean ignoreStdin, final OutputStream stdinWritable,
+ final ProcessorLog logger, final InputStream incomingFlowFileIS) throws IOException {
+ Thread writerThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ if (!ignoreStdin) {
+ try {
+ StreamUtils.copy(incomingFlowFileIS, stdinWritable);
+ } catch (IOException e) {
+ // This is unlikely to occur, and isn't handled at the moment
+ // Bug captured in NIFI-1194
+ logger.error("Failed to write flow file to stdin due to {}", new Object[]{e}, e);
+ }
+ }
+ // MUST close the output stream to the stdin so that whatever is reading knows
+ // there is no more data.
+ IOUtils.closeQuietly(stdinWritable);
+ }
+ });
+ writerThread.setDaemon(true);
+ writerThread.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java
new file mode 100644
index 0000000..95e9a72
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SoftLimitBoundedByteArrayOutputStream.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SoftLimitBoundedByteArrayOutputStream extends OutputStream {
+ /*
+ * This Bounded Array Output Stream (BAOS) allows the user to write to the output stream up to a specified limit.
+ * Higher than that limit the BAOS will silently return and not put more into the buffer. It also will not throw an error.
+ * This effectively truncates the stream for the user to fit into a bounded array.
+ */
+
+ private final byte[] buffer;
+ private int limit;
+ private int count;
+
+ public SoftLimitBoundedByteArrayOutputStream(int capacity) {
+ this(capacity, capacity);
+ }
+
+ public SoftLimitBoundedByteArrayOutputStream(int capacity, int limit) {
+ if ((capacity < limit) || (capacity | limit) < 0) {
+ throw new IllegalArgumentException("Invalid capacity/limit");
+ }
+ this.buffer = new byte[capacity];
+ this.limit = limit;
+ this.count = 0;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (count >= limit) {
+ return;
+ }
+ buffer[count++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+
+ if (count + len > limit) {
+ len = limit-count;
+ if(len == 0){
+ return;
+ }
+ }
+
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
+ }
+
+ public void reset(int newlim) {
+ if (newlim > buffer.length) {
+ throw new IndexOutOfBoundsException("Limit exceeds buffer size");
+ }
+ this.limit = newlim;
+ this.count = 0;
+ }
+
+ public void reset() {
+ this.limit = buffer.length;
+ this.count = 0;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ public int size() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1e5cc070/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
index 0f13ba2..44576d4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
@@ -38,9 +38,6 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-/**
- *
- */
public class TestExecuteStreamCommand {
@BeforeClass
public static void init() {
@@ -232,6 +229,237 @@ public class TestExecuteStreamCommand {
}
@Test
+ public void testSmallEchoPutToAttribute() throws Exception {
+ File dummy = new File("src/test/resources/hello.txt");
+ assertTrue(dummy.exists());
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue("".getBytes());
+
+ if(isWindows()) {
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;echo Hello");
+ controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, ";");
+ } else{
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "Hello");
+ }
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ outputFlowFile.assertContentEquals("");
+ String ouput = outputFlowFile.getAttribute("executeStreamCommand.output");
+ assertTrue(ouput.startsWith("Hello"));
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command"));
+ }
+
+ @Test
+ public void testExecuteJarPutToAttribute() throws Exception {
+ File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
+ File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ String result = outputFlowFile.getAttribute("executeStreamCommand.output");
+ outputFlowFile.assertContentEquals(dummy);
+ assertTrue(Pattern.compile("Test was a success\r?\n").matcher(result).find());
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+ assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5));
+ String attribute = outputFlowFile.getAttribute("execution.command.args");
+ String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
+ assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
+ }
+
+ @Test
+ public void testExecuteJarToAttributeConfiguration() throws Exception {
+ File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue("small test".getBytes());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "10");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest");
+ assertEquals(1, controller.getProcessContext().getAvailableRelationships().size());
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ outputFlowFile.assertContentEquals("small test".getBytes());
+ String result = outputFlowFile.getAttribute("outputDest");
+ assertTrue(Pattern.compile("Test was a").matcher(result).find());
+ assertEquals("0", outputFlowFile.getAttribute("execution.status"));
+ assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+ assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5));
+ String attribute = outputFlowFile.getAttribute("execution.command.args");
+ String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
+ assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
+ }
+
+ @Test
+ public void testExecuteIngestAndUpdatePutToAttribute() throws IOException {
+ File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ File dummy100MBytes = new File("target/100MB.txt");
+ FileInputStream fis = new FileInputStream(dummy);
+ FileOutputStream fos = new FileOutputStream(dummy100MBytes);
+ byte[] bytes = new byte[1024];
+ assertEquals(1000, fis.read(bytes));
+ fis.close();
+ for (int i = 0; i < 100000; i++) {
+ fos.write(bytes, 0, 1000);
+ }
+ fos.close();
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue(dummy100MBytes.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest");
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result = flowFiles.get(0).getAttribute("outputDest");
+
+ assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
+ }
+
+ @Test
+ public void testLargePutToAttribute() throws IOException {
+ File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ File dummy100MBytes = new File("target/100MB.txt");
+ FileInputStream fis = new FileInputStream(dummy);
+ FileOutputStream fos = new FileOutputStream(dummy100MBytes);
+ byte[] bytes = new byte[1024];
+ assertEquals(1000, fis.read(bytes));
+ fis.close();
+ for (int i = 0; i < 100000; i++) {
+ fos.write(bytes, 0, 1000);
+ }
+ fos.close();
+
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue("".getBytes());
+ if(isWindows()) {
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;type " + dummy100MBytes.getAbsolutePath());
+ controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, ";");
+ } else{
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cat");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, dummy100MBytes.getAbsolutePath());
+ }
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+ controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "256");
+
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+
+ flowFiles.get(0).assertAttributeEquals("execution.status", "0");
+ String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
+ assertTrue(Pattern.compile("a{256}").matcher(result).matches());
+ }
+
+ @Test
+ public void testExecuteIngestAndUpdateWithWorkingDirPutToAttribute() throws IOException {
+ File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "streamOutput");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result = flowFiles.get(0).getAttribute("streamOutput");
+
+ final String quotedSeparator = Pattern.quote(File.separator);
+ assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find());
+ }
+
+ @Test
+ public void testIgnoredStdinPutToAttribute() throws IOException {
+ File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
+ File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+ controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
+ assertTrue("TestIngestAndUpdate.jar should not have received anything to modify",
+ Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find());
+ }
+
+ @Test
+ public void testDynamicEnvironmentPutToAttribute() throws Exception {
+ File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
+ File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setProperty("NIFI_TEST_1", "testvalue1");
+ controller.setProperty("NIFI_TEST_2", "testvalue2");
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+ List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
+ Set<String> dynamicEnvironmentVariables = new HashSet<>(Arrays.asList(result.split("\r?\n")));
+ assertFalse("Should contain at least two environment variables starting with NIFI", dynamicEnvironmentVariables.size() < 2);
+ assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1"));
+ assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
+ }
+
+ @Test
public void testQuotedArguments() throws Exception {
List<String> args = ArgumentUtils.splitArgs("echo -n \"arg1 arg2 arg3\"", ' ');
assertEquals(3, args.size());
@@ -250,4 +478,7 @@ public class TestExecuteStreamCommand {
controller.assertValid();
}
+ private static boolean isWindows() {
+ return System.getProperty("os.name").toLowerCase().startsWith("windows");
+ }
}