You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2022/06/01 15:27:21 UTC

[nifi] branch main updated: NIFI-10069 Updated multiple components to support Sensitive Dynamic Properties

This is an automated email from the ASF dual-hosted git repository.

kdoran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 45cc3cefe5 NIFI-10069 Updated multiple components to support Sensitive Dynamic Properties
45cc3cefe5 is described below

commit 45cc3cefe5f94915d8665d962e1d6b326c8cca99
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue May 31 11:00:29 2022 -0500

    NIFI-10069 Updated multiple components to support Sensitive Dynamic Properties
    
    - ExecuteScript
    - ExecuteStreamCommand
    - InvokeScriptedProcessor
    - HikariCPConnectionPool
    
    This closes #6085.
    
    Signed-off-by: Kevin Doran <kd...@apache.org>
---
 .../nifi/processors/script/ExecuteScript.java      |  6 +-
 .../processors/script/InvokeScriptedProcessor.java | 10 ++-
 .../processors/standard/ExecuteStreamCommand.java  | 92 ++++++++++------------
 .../apache/nifi/dbcp/HikariCPConnectionPool.java   |  2 +
 4 files changed, 53 insertions(+), 57 deletions(-)

diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
index 8d64dcf4e9..24c946474d 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
@@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Restriction;
 import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -68,9 +69,10 @@ import java.util.Set;
         + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
         + "the script. If the handling is incomplete or incorrect, the session will be rolled back. Experimental: "
         + "Impact of sustained usage not yet verified.")
+@SupportsSensitiveDynamicProperties
 @DynamicProperty(
-        name = "A script engine property to update",
-        value = "The value to set it to",
+        name = "Script Engine Binding property",
+        value = "Binding property value passed to Script Runner",
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
         description = "Updates a script engine property specified by the Dynamic Property's key with the value "
                 + "specified by the Dynamic Property's value")
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index 8c89233d88..d5172be8a7 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Restriction;
 import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -76,7 +77,8 @@ import java.util.concurrent.atomic.AtomicReference;
         + "public void onStopped(ProcessContext context) methods to be invoked when the parent InvokeScriptedProcessor is scheduled or stopped, respectively.  "
         + "NOTE: The script will be loaded when the processor is populated with property values, see the Restrictions section for more security implications.  "
         + "Experimental: Impact of sustained usage not yet verified.")
-@DynamicProperty(name = "A script engine property to update", value = "The value to set it to",
+@SupportsSensitiveDynamicProperties
+@DynamicProperty(name = "Script Engine Binding property", value = "Binding property value passed to Script Runner",
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
         description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
 @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
@@ -301,7 +303,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
         // store the updated validation results
         validationResults.set(results);
 
-        // return whether there was any issues loading the configured script
+        // return whether there were any issues loading the configured script
         return results.isEmpty();
     }
 
@@ -332,7 +334,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
         // store the updated validation results
         validationResults.set(results);
 
-        // return whether there was any issues loading the configured script
+        // return whether there were any issues loading the configured script
         return results.isEmpty();
     }
 
@@ -360,7 +362,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
             }
 
             if (scriptRunner == null) {
-                throw new ProcessException("No script runner available!");
+                throw new ProcessException("No script runner available");
             }
             // get the engine and ensure its invocable
             ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 97b39bb7d3..2ebefcb1f1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperties;
@@ -45,6 +46,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Restriction;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -67,7 +69,6 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.ArgumentUtils;
 import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
@@ -148,6 +149,7 @@ import org.apache.nifi.stream.io.StreamUtils;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"command execution", "command", "stream", "execute"})
 @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
+@SupportsSensitiveDynamicProperties
 @DynamicProperties({
     @DynamicProperty(name = "An environment variable name", value = "An environment variable value",
         description = "These environment variables are passed to the process spawned by this Processor"),
@@ -182,7 +184,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
             .description("The destination path for the flow file created from the command's output, if the returned status code is non-zero. "
                     + "All flow files routed to this relationship will be penalized.")
             .build();
-    private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
+    private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
 
     private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
     private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
@@ -352,7 +354,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
         // get the number part of the name
         Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName);
         if (matcher.matches()) {
-            final String commandIndex = matcher.group("commandIndex");
             return new PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
                 .displayName(propertyDescriptorName)
@@ -386,21 +387,19 @@ public class ExecuteStreamCommand extends AbstractProcessor {
         if (!useDynamicPropertyArguments) {
             commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
             if (!StringUtils.isBlank(commandArguments)) {
-                for (String arg : ArgumentUtils
-                    .splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
-                    args.add(arg);
-                }
+                args.addAll(ArgumentUtils
+                        .splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0)));
             }
         } else {
 
-            ArrayList<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
+            List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
             for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
                 Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName());
                 if (matcher.matches()) {
                     propertyDescriptors.add(entry.getKey());
                 }
             }
-            Collections.sort(propertyDescriptors,(p1,p2) -> {
+            propertyDescriptors.sort((p1, p2) -> {
                 Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName());
                 String indexString1 = null;
                 while (matcher.find()) {
@@ -413,7 +412,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
                 }
                 final int index1 = Integer.parseInt(indexString1);
                 final int index2 = Integer.parseInt(indexString2);
-                if ( index1 > index2 ) {
+                if (index1 > index2) {
                     return 1;
                 } else if (index1 < index2) {
                     return -1;
@@ -439,12 +438,13 @@ public class ExecuteStreamCommand extends AbstractProcessor {
 
         final ProcessBuilder builder = new ProcessBuilder();
 
-        logger.debug("Executing and waiting for command {} with arguments {}", new Object[]{executeCommand, commandArguments});
+        // Avoid logging arguments that could contain sensitive values
+        logger.debug("Executing and waiting for command: {}", executeCommand);
         File dir = null;
         if (!StringUtils.isBlank(workingDir)) {
             dir = new File(workingDir);
             if (!dir.exists() && !dir.mkdirs()) {
-                logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")});
+                logger.warn("Failed to create working directory {}, using current working directory {}", workingDir, System.getProperty("user.dir"));
             }
         }
         final Map<String, String> environment = new HashMap<>();
@@ -484,7 +484,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
         try (final OutputStream pos = process.getOutputStream();
              final InputStream pis = process.getInputStream();
              final BufferedInputStream bis = new BufferedInputStream(pis)) {
-            int exitCode = -1;
             final BufferedOutputStream bos = new BufferedOutputStream(pos);
             FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
 
@@ -497,8 +496,8 @@ public class ExecuteStreamCommand extends AbstractProcessor {
                 outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size));
             }
 
-            exitCode = callback.exitCode;
-            logger.debug("Execution complete for command: {}.  Exited with code: {}", new Object[]{executeCommand, exitCode});
+            int exitCode = callback.exitCode;
+            logger.debug("Execution complete for command: {}.  Exited with code: {}", executeCommand, exitCode);
 
             Map<String, String> attributes = new HashMap<>();
 
@@ -511,16 +510,15 @@ public class ExecuteStreamCommand extends AbstractProcessor {
             } catch (IOException e) {
                 strBldr.append("Unknown...could not read Process's Std Error");
             }
-            int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
+            int length = Math.min(strBldr.length(), 4000);
             attributes.put("execution.error", strBldr.substring(0, length));
 
             final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (exitCode != 0) ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
             if (exitCode == 0) {
-                logger.info("Transferring flow file {} to {}",
-                        new Object[]{outputFlowFile,outputFlowFileRelationship.getName()});
+                logger.info("Transferring {} to {}", outputFlowFile, outputFlowFileRelationship.getName());
             } else {
-                logger.error("Transferring flow file {} to {}. Executable command {} ended in an error: {}",
-                        new Object[]{outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString()});
+                logger.error("Transferring {} to {}. Executable command {} ended in an error: {}",
+                        outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString());
             }
 
             attributes.put("execution.status", Integer.toString(exitCode));
@@ -536,16 +534,16 @@ public class ExecuteStreamCommand extends AbstractProcessor {
             session.transfer(outputFlowFile, outputFlowFileRelationship);
 
             if (!putToAttribute) {
-                logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
+                logger.info("Transferring {} to original", inputFlowFile);
                 inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
                 session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
             }
 
-        } catch (final IOException ex) {
+        } catch (final IOException e) {
             // could not close Process related streams
-            logger.warn("Problem terminating Process {}", new Object[]{process}, ex);
+            logger.warn("Problem terminating Process {}", process, e);
         } finally {
-            errorOut.delete();
+            FileUtils.deleteQuietly(errorOut);
             process.destroy(); // last ditch effort to clean up that process.
         }
     }
@@ -606,17 +604,13 @@ public class ExecuteStreamCommand extends AbstractProcessor {
                     }
                 }
             } else {
-                outputFlowFile = session.write(outputFlowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(OutputStream out) throws IOException {
-
-                        readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
-                        StreamUtils.copy(stdoutReadable, out);
-                        try {
-                            exitCode = process.waitFor();
-                        } catch (InterruptedException e) {
-                            logger.warn("Command Execution Process was interrupted", e);
-                        }
+                outputFlowFile = session.write(outputFlowFile, out -> {
+                    readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
+                    StreamUtils.copy(stdoutReadable, out);
+                    try {
+                        exitCode = process.waitFor();
+                    } catch (InterruptedException e) {
+                        logger.warn("Command Execution Process was interrupted", e);
                     }
                 });
             }
@@ -624,24 +618,20 @@ public class ExecuteStreamCommand extends AbstractProcessor {
     }
 
     private static void readStdoutReadable(final boolean ignoreStdin, final OutputStream stdinWritable,
-                                           final ComponentLog logger, final InputStream incomingFlowFileIS) throws IOException {
-        Thread writerThread = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                if (!ignoreStdin) {
-                    try {
-                        StreamUtils.copy(incomingFlowFileIS, stdinWritable);
-                    } catch (IOException e) {
-                        // This is unlikely to occur, and isn't handled at the moment
-                        // Bug captured in NIFI-1194
-                        logger.error("Failed to write flow file to stdin due to {}", new Object[]{e}, e);
-                    }
+                                           final ComponentLog logger, final InputStream incomingFlowFileIS) {
+        Thread writerThread = new Thread(() -> {
+            if (!ignoreStdin) {
+                try {
+                    StreamUtils.copy(incomingFlowFileIS, stdinWritable);
+                } catch (IOException e) {
+                    // This is unlikely to occur, and isn't handled at the moment
+                    // Bug captured in NIFI-1194
+                    logger.error("Failed to write FlowFile to Standard Input Stream", e);
                 }
-                // MUST close the output stream to the stdin so that whatever is reading knows
-                // there is no more data.
-                IOUtils.closeQuietly(stdinWritable);
             }
+            // MUST close the output stream to the stdin so that whatever is reading knows
+            // there is no more data.
+            IOUtils.closeQuietly(stdinWritable);
         });
         writerThread.setDaemon(true);
         writerThread.start();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
index 84a4883cb0..26b62db03b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java
@@ -20,6 +20,7 @@ import com.zaxxer.hikari.HikariDataSource;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -54,6 +55,7 @@ import java.util.stream.Collectors;
 @RequiresInstanceClassLoading
 @Tags({"dbcp", "hikari", "jdbc", "database", "connection", "pooling", "store"})
 @CapabilityDescription("Provides Database Connection Pooling Service based on HikariCP. Connections can be asked from pool and returned after usage.")
+@SupportsSensitiveDynamicProperties
 @DynamicProperty(name = "JDBC property name", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
         description = "Specifies a property name and value to be set on the JDBC connection(s). "
                 + "If Expression Language is used, evaluation will be performed upon the controller service being enabled. "