You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/29 22:14:32 UTC
[10/13] incubator-nifi git commit: NIFI-421: Deleted unused method;
formatted whitespace
NIFI-421: Deleted unused method; formatted whitespace
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fb8984cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fb8984cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fb8984cf
Branch: refs/heads/develop
Commit: fb8984cfa55654aeba0c8fe3d14be613af395a94
Parents: ad98ac5
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Apr 29 14:52:20 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Apr 29 14:52:20 2015 -0400
----------------------------------------------------------------------
.../processors/standard/ExecuteProcess.java | 113 ++++++++-----------
1 file changed, 46 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb8984cf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
index 2490f0c..f6085e7 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
@@ -66,43 +66,43 @@ import org.apache.nifi.processor.util.StandardValidators;
public class ExecuteProcess extends AbstractProcessor {
public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
- .name("Command")
- .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
- .required(true)
- .expressionLanguageSupported(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Command")
+ .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
- .name("Command Arguments")
- .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
- .required(false)
- .expressionLanguageSupported(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Command Arguments")
+ .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
- .name("Working Directory")
- .description("The directory to use as the current working directory when executing the command")
- .expressionLanguageSupported(false)
- .addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
- .required(false)
- .build();
+ .name("Working Directory")
+ .description("The directory to use as the current working directory when executing the command")
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
+ .required(false)
+ .build();
public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
- .name("Batch Duration")
- .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
- + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
- + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
+ .name("Batch Duration")
+ .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
+ + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
+ + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
- .name("Redirect Error Stream")
- .description("If true will redirect any error stream output of the process to the output stream. "
- + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
+ .name("Redirect Error Stream")
+ .description("If true will redirect any error stream output of the process to the output stream. "
+ + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
@@ -111,9 +111,9 @@ public class ExecuteProcess extends AbstractProcessor {
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All created FlowFiles are routed to this relationship")
- .build();
+ .name("success")
+ .description("All created FlowFiles are routed to this relationship")
+ .build();
private volatile ExecutorService executor;
private Future<?> longRunningProcess;
@@ -138,11 +138,11 @@ public class ExecuteProcess extends AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
- .dynamic(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name(propertyDescriptorName)
+ .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
+ .dynamic(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
}
static List<String> splitArgs(final String input) {
@@ -212,17 +212,16 @@ public class ExecuteProcess extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final long startNanos = System.nanoTime();
-
- if (proxyOut==null)
+ if (proxyOut==null) {
proxyOut = new ProxyOutputStream(getLogger());
+ }
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
final List<String> commandStrings = createCommandStrings(context);
final String commandString = StringUtils.join(commandStrings, " ");
- if (longRunningProcess == null || longRunningProcess.isDone())
+ if (longRunningProcess == null || longRunningProcess.isDone()) {
try {
longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut);
} catch (final IOException ioe) {
@@ -230,8 +229,9 @@ public class ExecuteProcess extends AbstractProcessor {
context.yield();
return;
}
- else
+ } else {
getLogger().info("Read from long running process");
+ }
if (!isScheduled()) {
getLogger().info("User stopped processor; will terminate process immediately");
@@ -239,10 +239,8 @@ public class ExecuteProcess extends AbstractProcessor {
return;
}
- // Create a FlowFile that we can write to and set the OutputStream for
- // the FlowFile
- // as the delegate for the ProxyOuptutStream, then wait until the
- // process finishes
+ // Create a FlowFile that we can write to and set the OutputStream for the FlowFile
+ // as the delegate for the ProxyOuptutStream, then wait until the process finishes
// or until the specified amount of time
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@@ -252,8 +250,7 @@ public class ExecuteProcess extends AbstractProcessor {
proxyOut.setDelegate(out);
if (batchNanos == null) {
- // we are not creating batches; wait until process
- // terminates.
+ // we are not creating batches; wait until process terminates.
// NB!!! Maybe get(long timeout, TimeUnit unit) should
// be used to avoid waiting forever.
try {
@@ -271,7 +268,7 @@ public class ExecuteProcess extends AbstractProcessor {
}
proxyOut.setDelegate(null); // prevent from writing to this
- // stream
+ // stream
}
}
});
@@ -280,8 +277,7 @@ public class ExecuteProcess extends AbstractProcessor {
// If no data was written to the file, remove it
session.remove(flowFile);
} else if (failure.get()) {
- // If there was a failure processing the output of the Process,
- // remove the FlowFile
+ // If there was a failure processing the output of the Process, remove the FlowFile
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
} else {
@@ -291,13 +287,11 @@ public class ExecuteProcess extends AbstractProcessor {
session.transfer(flowFile, REL_SUCCESS);
}
- // Commit the session so that the FlowFile is transferred to the next
- // processor
+ // Commit the session so that the FlowFile is transferred to the next processor
session.commit();
}
protected List<String> createCommandStrings(final ProcessContext context) {
-
final String command = context.getProperty(COMMAND).getValue();
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
@@ -381,8 +375,7 @@ public class ExecuteProcess extends AbstractProcessor {
// setting a batch during means text.
// Also, we don't want that text to get split up in the
// middle of a line, so we use BufferedReader
- // to read lines of text and write them as lines of
- // text.
+ // to read lines of text and write them as lines of text.
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) {
String line;
@@ -402,12 +395,10 @@ public class ExecuteProcess extends AbstractProcessor {
int exitCode;
try {
exitCode = newProcess.exitValue();
- } catch (Exception e) {
+ } catch (final Exception e) {
exitCode = -99999;
}
getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
- // getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis",
- // new Object[]{exitCode, flowFileCount, millis});
}
return null;
@@ -417,18 +408,6 @@ public class ExecuteProcess extends AbstractProcessor {
return future;
}
- // NB!!! Currently not used, Future<?> longRunningProcess is used to check whether process is done or not.
- private boolean isAlive(final Process process) {
- // unfortunately, java provides no straight-forward way to test if a Process is alive.
- // In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
- // so we have this solution in the mean time.
- try {
- process.exitValue();
- return false;
- } catch (final IllegalThreadStateException itse) {
- return true;
- }
- }
/**
* Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed