You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/29 22:14:32 UTC

[10/13] incubator-nifi git commit: NIFI-421: Deleted unused method; formatted whitespace

NIFI-421: Deleted unused method; formatted whitespace


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fb8984cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fb8984cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fb8984cf

Branch: refs/heads/develop
Commit: fb8984cfa55654aeba0c8fe3d14be613af395a94
Parents: ad98ac5
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Apr 29 14:52:20 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Apr 29 14:52:20 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/ExecuteProcess.java     | 113 ++++++++-----------
 1 file changed, 46 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb8984cf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
index 2490f0c..f6085e7 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java
@@ -66,43 +66,43 @@ import org.apache.nifi.processor.util.StandardValidators;
 public class ExecuteProcess extends AbstractProcessor {
 
     public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
-            .name("Command")
-            .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
-            .required(true)
-            .expressionLanguageSupported(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+    .name("Command")
+    .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
+    .required(true)
+    .expressionLanguageSupported(false)
+    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+    .build();
 
     public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
-            .name("Command Arguments")
-            .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
-            .required(false)
-            .expressionLanguageSupported(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+    .name("Command Arguments")
+    .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
+    .required(false)
+    .expressionLanguageSupported(false)
+    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+    .build();
 
     public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
-            .name("Working Directory")
-            .description("The directory to use as the current working directory when executing the command")
-            .expressionLanguageSupported(false)
-            .addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
-            .required(false)
-            .build();
+    .name("Working Directory")
+    .description("The directory to use as the current working directory when executing the command")
+    .expressionLanguageSupported(false)
+    .addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
+    .required(false)
+    .build();
 
     public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
-            .name("Batch Duration")
-            .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
-                    + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
-                    + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
+    .name("Batch Duration")
+    .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
+            + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
+            + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
             .required(false)
             .expressionLanguageSupported(false)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
-            .name("Redirect Error Stream")
-            .description("If true will redirect any error stream output of the process to the output stream. "
-                    + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
+    .name("Redirect Error Stream")
+    .description("If true will redirect any error stream output of the process to the output stream. "
+            + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
             .required(false)
             .allowableValues("true", "false")
             .defaultValue("false")
@@ -111,9 +111,9 @@ public class ExecuteProcess extends AbstractProcessor {
             .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All created FlowFiles are routed to this relationship")
-            .build();
+    .name("success")
+    .description("All created FlowFiles are routed to this relationship")
+    .build();
 
     private volatile ExecutorService executor;
     private Future<?> longRunningProcess;
@@ -138,11 +138,11 @@ public class ExecuteProcess extends AbstractProcessor {
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
-                .dynamic(true)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .build();
+        .name(propertyDescriptorName)
+        .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
+        .dynamic(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
     }
 
     static List<String> splitArgs(final String input) {
@@ -212,17 +212,16 @@ public class ExecuteProcess extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final long startNanos = System.nanoTime();
-
-        if (proxyOut==null)
+        if (proxyOut==null) {
             proxyOut = new ProxyOutputStream(getLogger());
+        }
 
         final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
 
         final List<String> commandStrings = createCommandStrings(context);
         final String commandString = StringUtils.join(commandStrings, " ");
 
-        if (longRunningProcess == null || longRunningProcess.isDone())
+        if (longRunningProcess == null || longRunningProcess.isDone()) {
             try {
                 longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut);
             } catch (final IOException ioe) {
@@ -230,8 +229,9 @@ public class ExecuteProcess extends AbstractProcessor {
                 context.yield();
                 return;
             }
-        else
+        } else {
             getLogger().info("Read from long running process");
+        }
 
         if (!isScheduled()) {
             getLogger().info("User stopped processor; will terminate process immediately");
@@ -239,10 +239,8 @@ public class ExecuteProcess extends AbstractProcessor {
             return;
         }
 
-        // Create a FlowFile that we can write to and set the OutputStream for
-        // the FlowFile
-        // as the delegate for the ProxyOuptutStream, then wait until the
-        // process finishes
+        // Create a FlowFile that we can write to and set the OutputStream for the FlowFile
+        // as the delegate for the ProxyOuptutStream, then wait until the process finishes
         // or until the specified amount of time
         FlowFile flowFile = session.create();
         flowFile = session.write(flowFile, new OutputStreamCallback() {
@@ -252,8 +250,7 @@ public class ExecuteProcess extends AbstractProcessor {
                     proxyOut.setDelegate(out);
 
                     if (batchNanos == null) {
-                        // we are not creating batches; wait until process
-                        // terminates.
+                        // we are not creating batches; wait until process terminates.
                         // NB!!! Maybe get(long timeout, TimeUnit unit) should
                         // be used to avoid waiting forever.
                         try {
@@ -271,7 +268,7 @@ public class ExecuteProcess extends AbstractProcessor {
                     }
 
                     proxyOut.setDelegate(null); // prevent from writing to this
-                                                // stream
+                    // stream
                 }
             }
         });
@@ -280,8 +277,7 @@ public class ExecuteProcess extends AbstractProcessor {
             // If no data was written to the file, remove it
             session.remove(flowFile);
         } else if (failure.get()) {
-            // If there was a failure processing the output of the Process,
-            // remove the FlowFile
+            // If there was a failure processing the output of the Process, remove the FlowFile
             session.remove(flowFile);
             getLogger().error("Failed to read data from Process, so will not generate FlowFile");
         } else {
@@ -291,13 +287,11 @@ public class ExecuteProcess extends AbstractProcessor {
             session.transfer(flowFile, REL_SUCCESS);
         }
 
-        // Commit the session so that the FlowFile is transferred to the next
-        // processor
+        // Commit the session so that the FlowFile is transferred to the next processor
         session.commit();
     }
 
     protected List<String> createCommandStrings(final ProcessContext context) {
-
         final String command = context.getProperty(COMMAND).getValue();
         final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
 
@@ -381,8 +375,7 @@ public class ExecuteProcess extends AbstractProcessor {
                         // setting a batch during means text.
                         // Also, we don't want that text to get split up in the
                         // middle of a line, so we use BufferedReader
-                        // to read lines of text and write them as lines of
-                        // text.
+                        // to read lines of text and write them as lines of text.
                         try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) {
                             String line;
 
@@ -402,12 +395,10 @@ public class ExecuteProcess extends AbstractProcessor {
                     int exitCode;
                     try {
                         exitCode = newProcess.exitValue();
-                    } catch (Exception e) {
+                    } catch (final Exception e) {
                         exitCode = -99999;
                     }
                     getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
-                    // getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis",
-                    // new Object[]{exitCode, flowFileCount, millis});
                 }
 
                 return null;
@@ -417,18 +408,6 @@ public class ExecuteProcess extends AbstractProcessor {
         return future;
     }
 
-    // NB!!! Currently not used, Future<?> longRunningProcess is used to check whether process is done or not.
-    private boolean isAlive(final Process process) {
-        // unfortunately, java provides no straight-forward way to test if a Process is alive.
-        // In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
-        // so we have this solution in the mean time.
-        try {
-            process.exitValue();
-            return false;
-        } catch (final IllegalThreadStateException itse) {
-            return true;
-        }
-    }
 
     /**
      * Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed