You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/03/24 00:43:34 UTC

[nifi] branch master updated: NIFI-7271 Make command timeout configurable for ShellUserGroupProvider - Changing ShellRunner to use a separate thread for reading the output of the process - Removing unused member variable - Addressing review feedback

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f32aa5  NIFI-7271 Make command timeout configurable for ShellUserGroupProvider - Changing ShellRunner to use a separate thread for reading the output of the process - Removing unused member variable - Addressing review feedback
7f32aa5 is described below

commit 7f32aa56db9156d197666821fc90a1edd2ba755d
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Fri Mar 20 11:55:37 2020 -0400

    NIFI-7271 Make command timeout configurable for ShellUserGroupProvider
    - Changing ShellRunner to use a separate thread for reading the output of the process
    - Removing unused member variable
    - Addressing review feedback
    
    This closes #4154.
---
 .../src/main/resources/conf/authorizers.xml        |  2 +
 .../nifi/authorization/ShellUserGroupProvider.java | 61 ++++++++++++---
 .../nifi/authorization/util/ShellRunner.java       | 89 ++++++++++++++++------
 .../authorization/ShellUserGroupProviderIT.java    |  5 +-
 4 files changed, 123 insertions(+), 34 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
index ec1560f..06b57df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
@@ -181,6 +181,7 @@
         'Legacy Identifier Mode' - preserves the legacy behavior for id generation. Disabling this will ensure that
                                     user and group ids are differentiated to handle the case where a user and group have
                                     the same identity. Default is 'true', which means users and groups are not differentiated.
+        'Command Timeout' - amount of time to wait while executing a command before timing out
     -->
     <!-- To enable the shell-user-group-provider remove 2 lines. This is 1 of 2.
     <userGroupProvider>
@@ -190,6 +191,7 @@
         <property name="Exclude Groups"></property>
         <property name="Exclude Users"></property>
         <property name="Legacy Identifier Mode">true</property>
+        <property name="Command Timeout">60 seconds</property>
     </userGroupProvider>
     To enable the shell-user-group-provider remove 2 lines. This is 2 of 2. -->
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
index c31fd11..6ad35dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
@@ -59,22 +59,24 @@ public class ShellUserGroupProvider implements UserGroupProvider {
     public static final String EXCLUDE_USER_PROPERTY = "Exclude Users";
     public static final String EXCLUDE_GROUP_PROPERTY = "Exclude Groups";
     public static final String LEGACY_IDENTIFIER_MODE = "Legacy Identifier Mode";
+    public static final String COMMAND_TIMEOUT_PROPERTY = "Command Timeout";
+
+    private static final String DEFAULT_COMMAND_TIMEOUT = "60 seconds";
 
     private long fixedDelay;
     private Pattern excludeUsers;
     private Pattern excludeGroups;
     private boolean legacyIdentifierMode;
+    private int timeoutSeconds;
 
     // Our scheduler has one thread for users, one for groups:
     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
 
-    // Our shell timeout, in seconds:
-    @SuppressWarnings("FieldCanBeLocal")
-    private final Integer shellTimeout = 10;
-
     // Commands selected during initialization:
     private ShellCommandsProvider selectedShellCommands;
 
+    private ShellRunner shellRunner;
+
     // Start of the UserGroupProvider implementation.  Javadoc strings
     // copied from the interface definition for reference.
 
@@ -241,7 +243,12 @@ public class ShellUserGroupProvider implements UserGroupProvider {
      */
     @Override
     public void onConfigured(AuthorizerConfigurationContext configurationContext) throws AuthorizerCreationException {
+        logger.info("Configuring ShellUserGroupProvider");
+
         fixedDelay = getDelayProperty(configurationContext, REFRESH_DELAY_PROPERTY, "5 mins");
+        timeoutSeconds = getTimeoutProperty(configurationContext, COMMAND_TIMEOUT_PROPERTY, DEFAULT_COMMAND_TIMEOUT);
+        shellRunner = new ShellRunner(timeoutSeconds);
+        logger.debug("Configured ShellRunner with command timeout of '{}' seconds", new Object[]{timeoutSeconds});
 
         // Our next init step is to select the command set based on the operating system name:
         ShellCommandsProvider commands = getCommandsProvider();
@@ -254,7 +261,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         // Our next init step is to run the system check from that command set to determine if the other commands
         // will work on this host or not.
         try {
-            ShellRunner.runShell(commands.getSystemCheck());
+            shellRunner.runShell(commands.getSystemCheck());
         } catch (final Exception e) {
             logger.error("initialize exception: " + e + " system check command: " + commands.getSystemCheck());
             throw new AuthorizerCreationException(SYS_CHECK_ERROR, e);
@@ -283,6 +290,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
             }
         }, fixedDelay, fixedDelay, TimeUnit.MILLISECONDS);
 
+        logger.info("Completed configuration of ShellUserGroupProvider");
     }
 
     private static ShellCommandsProvider getCommandsProviderFromName(String osName) {
@@ -339,6 +347,26 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         return syncInterval;
     }
 
+    private int getTimeoutProperty(AuthorizerConfigurationContext authContext, String propertyName, String defaultValue) {
+        final PropertyValue timeoutProperty = authContext.getProperty(propertyName);
+
+        final String propertyValue;
+        if (timeoutProperty.isSet()) {
+            propertyValue = timeoutProperty.getValue();
+        } else {
+            propertyValue = defaultValue;
+        }
+
+        final long timeoutValue;
+        try {
+            timeoutValue = Math.round(FormatUtils.getPreciseTimeDuration(propertyValue, TimeUnit.SECONDS));
+        } catch (final IllegalArgumentException ignored) {
+            throw new AuthorizerCreationException(String.format("The %s '%s' is not a valid time interval.", propertyName, propertyValue));
+        }
+
+        return Math.toIntExact(timeoutValue);
+    }
+
     /**
      * Called immediately before instance destruction for implementers to release resources.
      *
@@ -348,7 +376,13 @@ public class ShellUserGroupProvider implements UserGroupProvider {
     public void preDestruction() throws AuthorizerDestructionException {
         try {
             scheduler.shutdownNow();
-        } catch (final Exception ignored) {
+        } catch (final Exception e) {
+            logger.warn("Error shutting down refresh scheduler: " + e.getMessage(), e);
+        }
+        try {
+            shellRunner.shutdown();
+        } catch (final Exception e) {
+            logger.warn("Error shutting down ShellRunner: " + e.getMessage(), e);
         }
     }
 
@@ -374,7 +408,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
             List<String> userLines;
 
             try {
-                userLines = ShellRunner.runShell(command, description);
+                userLines = shellRunner.runShell(command, description);
                 rebuildUsers(userLines, idToUser, usernameToUser, gidToUser);
             } catch (final IOException ioexc) {
                 logger.error("refreshOneUser shell exception: " + ioexc);
@@ -408,7 +442,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
             List<String> groupLines;
 
             try {
-                groupLines = ShellRunner.runShell(command, description);
+                groupLines = shellRunner.runShell(command, description);
                 rebuildGroups(groupLines, gidToGroup);
             } catch (final IOException ioexc) {
                 logger.error("refreshOneGroup shell exception: " + ioexc);
@@ -430,6 +464,8 @@ public class ShellUserGroupProvider implements UserGroupProvider {
      * other methods for record parse, extract, and object construction.
      */
     private void refreshUsersAndGroups() {
+        final long startTime = System.currentTimeMillis();
+
         Map<String, User> uidToUser = new HashMap<>();
         Map<String, User> usernameToUser = new HashMap<>();
         Map<String, User> gidToUser = new HashMap<>();
@@ -439,8 +475,8 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         List<String> groupLines;
 
         try {
-            userLines = ShellRunner.runShell(selectedShellCommands.getUsersList(), "Get Users List");
-            groupLines = ShellRunner.runShell(selectedShellCommands.getGroupsList(), "Get Groups List");
+            userLines = shellRunner.runShell(selectedShellCommands.getUsersList(), "Get Users List");
+            groupLines = shellRunner.runShell(selectedShellCommands.getGroupsList(), "Get Groups List");
         } catch (final IOException ioexc) {
             logger.error("refreshUsersAndGroups shell exception: " + ioexc);
             return;
@@ -480,6 +516,9 @@ public class ShellUserGroupProvider implements UserGroupProvider {
                 sortedGroups.forEach(g -> logger.trace("=== " + g.toString()));
             }
         }
+
+        final long endTime = System.currentTimeMillis();
+        logger.info("Refreshed users and groups, took {} seconds", (endTime - startTime) / 1000);
     }
 
     /**
@@ -548,7 +587,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
 
                 try {
                     String groupMembersCommand = selectedShellCommands.getGroupMembers(groupName);
-                    List<String> memberLines = ShellRunner.runShell(groupMembersCommand);
+                    List<String> memberLines = shellRunner.runShell(groupMembersCommand);
                     // Use the first line only, and log if the line count isn't exactly one:
                     if (!memberLines.isEmpty()) {
                         String memberLine = memberLines.get(0);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
index d12c00b..819640d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
@@ -23,9 +23,11 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
-
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 public class ShellRunner {
@@ -33,50 +35,93 @@ public class ShellRunner {
 
     static String SHELL = "sh";
     static String OPTS = "-c";
-    static Integer TIMEOUT = 60;
 
-    public static List<String> runShell(String command) throws IOException {
+    private final int timeoutSeconds;
+    private final ExecutorService executor;
+
+    public ShellRunner(final int timeoutSeconds) {
+        this.timeoutSeconds = timeoutSeconds;
+        this.executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setName("ShellRunner");
+                t.setDaemon(true);
+                return t;
+            }
+        });
+    }
+
+    public List<String> runShell(String command) throws IOException {
         return runShell(command, "<unknown>");
     }
 
-    public static List<String> runShell(String command, String description) throws IOException {
+    public List<String> runShell(String command, String description) throws IOException {
         final ProcessBuilder builder = new ProcessBuilder(SHELL, OPTS, command);
+        builder.redirectErrorStream(true);
+
         final List<String> builderCommand = builder.command();
+        logger.debug("Run Command '{}': {}", new Object[]{description, builderCommand});
 
-        logger.debug("Run Command '" + description + "': " + builderCommand);
         final Process proc = builder.start();
 
+        final List<String> lines = new ArrayList<>();
+        executor.submit(() -> {
+            try {
+                try (final Reader stdin = new InputStreamReader(proc.getInputStream());
+                     final BufferedReader reader = new BufferedReader(stdin)) {
+                    logger.trace("Reading process input stream...");
+
+                    String line;
+                    int lineCount = 0;
+                    while ((line = reader.readLine()) != null) {
+                        if (logger.isTraceEnabled()) {
+                            logger.trace((++lineCount) + " - " + line);
+                        }
+                        lines.add(line.trim());
+                    }
+
+                    logger.trace("Finished reading process input stream");
+                }
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+            }
+        });
+
         boolean completed;
         try {
-            completed = proc.waitFor(TIMEOUT, TimeUnit.SECONDS);
+            completed = proc.waitFor(timeoutSeconds, TimeUnit.SECONDS);
         } catch (InterruptedException irexc) {
             throw new IOException(irexc.getMessage(), irexc.getCause());
         }
 
         if (!completed) {
+            logger.debug("Process did not complete in allotted time, attempting to forcibly destroy process...");
+            try {
+                proc.destroyForcibly();
+            } catch (Exception e) {
+                logger.debug("Process failed to destroy: " + e.getMessage(), e);
+            }
             throw new IllegalStateException("Shell command '" + command + "' did not complete during the allotted time period");
         }
 
         if (proc.exitValue() != 0) {
-            try (final Reader stderr = new InputStreamReader(proc.getErrorStream());
-                 final BufferedReader reader = new BufferedReader(stderr)) {
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    logger.warn(line.trim());
-                }
-            }
-            throw new IOException("Command exit non-zero: " + proc.exitValue());
+            throw new IOException("Process exited with non-zero value: " + proc.exitValue());
         }
 
-        final List<String> lines = new ArrayList<>();
-        try (final Reader stdin = new InputStreamReader(proc.getInputStream());
-             final BufferedReader reader = new BufferedReader(stdin)) {
-            String line;
-            while ((line = reader.readLine()) != null) {
-                lines.add(line.trim());
+        return lines;
+    }
+
+    public void shutdown() {
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
+                logger.info("Failed to stop ShellRunner executor in 5 seconds. Terminating");
+                executor.shutdownNow();
             }
+        } catch (InterruptedException ie) {
+            executor.shutdownNow();
+            Thread.currentThread().interrupt();
         }
-
-        return lines;
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java
index 58ee973..cc7f0c4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java
@@ -84,6 +84,8 @@ public class ShellUserGroupProviderIT {
     private ShellUserGroupProvider localProvider;
     private UserGroupProviderInitializationContext initContext;
 
+    private static ShellRunner shellRunner;
+
     @ClassRule
     static public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -95,10 +97,11 @@ public class ShellUserGroupProviderIT {
         sshPrivKeyFile = tempFolder.getRoot().getAbsolutePath() + "/id_rsa";
         sshPubKeyFile = sshPrivKeyFile + ".pub";
 
+        shellRunner = new ShellRunner(60);
         try {
             // NB: this command is a bit perplexing: it works without prompt from the shell, but hangs
             // here without the pipe from `yes`:
-            ShellRunner.runShell("yes | ssh-keygen -C '' -N '' -t rsa -f " + sshPrivKeyFile);
+            shellRunner.runShell("yes | ssh-keygen -C '' -N '' -t rsa -f " + sshPrivKeyFile);
         } catch (final IOException ioexc) {
             systemCheckFailed = true;
             logger.error("setupOnce() exception: " + ioexc + "; tests cannot run on this system.");