You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/03/24 00:43:34 UTC
[nifi] branch master updated: NIFI-7271 Make command timeout
configurable for ShellUserGroupProvider - Changing ShellRunner to use a
separate thread for reading the output of the process - Removing unused
member variable - Addressing review feedback
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 7f32aa5 NIFI-7271 Make command timeout configurable for ShellUserGroupProvider - Changing ShellRunner to use a separate thread for reading the output of the process - Removing unused member variable - Addressing review feedback
7f32aa5 is described below
commit 7f32aa56db9156d197666821fc90a1edd2ba755d
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Fri Mar 20 11:55:37 2020 -0400
NIFI-7271 Make command timeout configurable for ShellUserGroupProvider
- Changing ShellRunner to use a separate thread for reading the output of the process
- Removing unused member variable
- Addressing review feedback
This closes #4154.
---
.../src/main/resources/conf/authorizers.xml | 2 +
.../nifi/authorization/ShellUserGroupProvider.java | 61 ++++++++++++---
.../nifi/authorization/util/ShellRunner.java | 89 ++++++++++++++++------
.../authorization/ShellUserGroupProviderIT.java | 5 +-
4 files changed, 123 insertions(+), 34 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
index ec1560f..06b57df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
@@ -181,6 +181,7 @@
'Legacy Identifier Mode' - preserves the legacy behavior for id generation. Disabling this will ensure that
user and group ids are differentiated to handle the case where a user and group have
the same identity. Default is 'true', which means users and groups are not differentiated.
+ 'Command Timeout' - amount of time to wait while executing a command before timing out
-->
<!-- To enable the shell-user-group-provider remove 2 lines. This is 1 of 2.
<userGroupProvider>
@@ -190,6 +191,7 @@
<property name="Exclude Groups"></property>
<property name="Exclude Users"></property>
<property name="Legacy Identifier Mode">true</property>
+ <property name="Command Timeout">60 seconds</property>
</userGroupProvider>
To enable the shell-user-group-provider remove 2 lines. This is 2 of 2. -->
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
index c31fd11..6ad35dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
@@ -59,22 +59,24 @@ public class ShellUserGroupProvider implements UserGroupProvider {
public static final String EXCLUDE_USER_PROPERTY = "Exclude Users";
public static final String EXCLUDE_GROUP_PROPERTY = "Exclude Groups";
public static final String LEGACY_IDENTIFIER_MODE = "Legacy Identifier Mode";
+ public static final String COMMAND_TIMEOUT_PROPERTY = "Command Timeout";
+
+ private static final String DEFAULT_COMMAND_TIMEOUT = "60 seconds";
private long fixedDelay;
private Pattern excludeUsers;
private Pattern excludeGroups;
private boolean legacyIdentifierMode;
+ private int timeoutSeconds;
// Our scheduler has one thread for users, one for groups:
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
- // Our shell timeout, in seconds:
- @SuppressWarnings("FieldCanBeLocal")
- private final Integer shellTimeout = 10;
-
// Commands selected during initialization:
private ShellCommandsProvider selectedShellCommands;
+ private ShellRunner shellRunner;
+
// Start of the UserGroupProvider implementation. Javadoc strings
// copied from the interface definition for reference.
@@ -241,7 +243,12 @@ public class ShellUserGroupProvider implements UserGroupProvider {
*/
@Override
public void onConfigured(AuthorizerConfigurationContext configurationContext) throws AuthorizerCreationException {
+ logger.info("Configuring ShellUserGroupProvider");
+
fixedDelay = getDelayProperty(configurationContext, REFRESH_DELAY_PROPERTY, "5 mins");
+ timeoutSeconds = getTimeoutProperty(configurationContext, COMMAND_TIMEOUT_PROPERTY, DEFAULT_COMMAND_TIMEOUT);
+ shellRunner = new ShellRunner(timeoutSeconds);
+ logger.debug("Configured ShellRunner with command timeout of '{}' seconds", new Object[]{timeoutSeconds});
// Our next init step is to select the command set based on the operating system name:
ShellCommandsProvider commands = getCommandsProvider();
@@ -254,7 +261,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
// Our next init step is to run the system check from that command set to determine if the other commands
// will work on this host or not.
try {
- ShellRunner.runShell(commands.getSystemCheck());
+ shellRunner.runShell(commands.getSystemCheck());
} catch (final Exception e) {
logger.error("initialize exception: " + e + " system check command: " + commands.getSystemCheck());
throw new AuthorizerCreationException(SYS_CHECK_ERROR, e);
@@ -283,6 +290,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
}
}, fixedDelay, fixedDelay, TimeUnit.MILLISECONDS);
+ logger.info("Completed configuration of ShellUserGroupProvider");
}
private static ShellCommandsProvider getCommandsProviderFromName(String osName) {
@@ -339,6 +347,26 @@ public class ShellUserGroupProvider implements UserGroupProvider {
return syncInterval;
}
+ private int getTimeoutProperty(AuthorizerConfigurationContext authContext, String propertyName, String defaultValue) {
+ final PropertyValue timeoutProperty = authContext.getProperty(propertyName);
+
+ final String propertyValue;
+ if (timeoutProperty.isSet()) {
+ propertyValue = timeoutProperty.getValue();
+ } else {
+ propertyValue = defaultValue;
+ }
+
+ final long timeoutValue;
+ try {
+ timeoutValue = Math.round(FormatUtils.getPreciseTimeDuration(propertyValue, TimeUnit.SECONDS));
+ } catch (final IllegalArgumentException ignored) {
+ throw new AuthorizerCreationException(String.format("The %s '%s' is not a valid time interval.", propertyName, propertyValue));
+ }
+
+ return Math.toIntExact(timeoutValue);
+ }
+
/**
* Called immediately before instance destruction for implementers to release resources.
*
@@ -348,7 +376,13 @@ public class ShellUserGroupProvider implements UserGroupProvider {
public void preDestruction() throws AuthorizerDestructionException {
try {
scheduler.shutdownNow();
- } catch (final Exception ignored) {
+ } catch (final Exception e) {
+ logger.warn("Error shutting down refresh scheduler: " + e.getMessage(), e);
+ }
+ try {
+ shellRunner.shutdown();
+ } catch (final Exception e) {
+ logger.warn("Error shutting down ShellRunner: " + e.getMessage(), e);
}
}
@@ -374,7 +408,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
List<String> userLines;
try {
- userLines = ShellRunner.runShell(command, description);
+ userLines = shellRunner.runShell(command, description);
rebuildUsers(userLines, idToUser, usernameToUser, gidToUser);
} catch (final IOException ioexc) {
logger.error("refreshOneUser shell exception: " + ioexc);
@@ -408,7 +442,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
List<String> groupLines;
try {
- groupLines = ShellRunner.runShell(command, description);
+ groupLines = shellRunner.runShell(command, description);
rebuildGroups(groupLines, gidToGroup);
} catch (final IOException ioexc) {
logger.error("refreshOneGroup shell exception: " + ioexc);
@@ -430,6 +464,8 @@ public class ShellUserGroupProvider implements UserGroupProvider {
* other methods for record parse, extract, and object construction.
*/
private void refreshUsersAndGroups() {
+ final long startTime = System.currentTimeMillis();
+
Map<String, User> uidToUser = new HashMap<>();
Map<String, User> usernameToUser = new HashMap<>();
Map<String, User> gidToUser = new HashMap<>();
@@ -439,8 +475,8 @@ public class ShellUserGroupProvider implements UserGroupProvider {
List<String> groupLines;
try {
- userLines = ShellRunner.runShell(selectedShellCommands.getUsersList(), "Get Users List");
- groupLines = ShellRunner.runShell(selectedShellCommands.getGroupsList(), "Get Groups List");
+ userLines = shellRunner.runShell(selectedShellCommands.getUsersList(), "Get Users List");
+ groupLines = shellRunner.runShell(selectedShellCommands.getGroupsList(), "Get Groups List");
} catch (final IOException ioexc) {
logger.error("refreshUsersAndGroups shell exception: " + ioexc);
return;
@@ -480,6 +516,9 @@ public class ShellUserGroupProvider implements UserGroupProvider {
sortedGroups.forEach(g -> logger.trace("=== " + g.toString()));
}
}
+
+ final long endTime = System.currentTimeMillis();
+ logger.info("Refreshed users and groups, took {} seconds", (endTime - startTime) / 1000);
}
/**
@@ -548,7 +587,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
try {
String groupMembersCommand = selectedShellCommands.getGroupMembers(groupName);
- List<String> memberLines = ShellRunner.runShell(groupMembersCommand);
+ List<String> memberLines = shellRunner.runShell(groupMembersCommand);
// Use the first line only, and log if the line count isn't exactly one:
if (!memberLines.isEmpty()) {
String memberLine = memberLines.get(0);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
index d12c00b..819640d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
@@ -23,9 +23,11 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
-
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class ShellRunner {
@@ -33,50 +35,93 @@ public class ShellRunner {
static String SHELL = "sh";
static String OPTS = "-c";
- static Integer TIMEOUT = 60;
- public static List<String> runShell(String command) throws IOException {
+ private final int timeoutSeconds;
+ private final ExecutorService executor;
+
+ public ShellRunner(final int timeoutSeconds) {
+ this.timeoutSeconds = timeoutSeconds;
+ this.executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("ShellRunner");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ }
+
+ public List<String> runShell(String command) throws IOException {
return runShell(command, "<unknown>");
}
- public static List<String> runShell(String command, String description) throws IOException {
+ public List<String> runShell(String command, String description) throws IOException {
final ProcessBuilder builder = new ProcessBuilder(SHELL, OPTS, command);
+ builder.redirectErrorStream(true);
+
final List<String> builderCommand = builder.command();
+ logger.debug("Run Command '{}': {}", new Object[]{description, builderCommand});
- logger.debug("Run Command '" + description + "': " + builderCommand);
final Process proc = builder.start();
+ final List<String> lines = new ArrayList<>();
+ executor.submit(() -> {
+ try {
+ try (final Reader stdin = new InputStreamReader(proc.getInputStream());
+ final BufferedReader reader = new BufferedReader(stdin)) {
+ logger.trace("Reading process input stream...");
+
+ String line;
+ int lineCount = 0;
+ while ((line = reader.readLine()) != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace((++lineCount) + " - " + line);
+ }
+ lines.add(line.trim());
+ }
+
+ logger.trace("Finished reading process input stream");
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ });
+
boolean completed;
try {
- completed = proc.waitFor(TIMEOUT, TimeUnit.SECONDS);
+ completed = proc.waitFor(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException irexc) {
throw new IOException(irexc.getMessage(), irexc.getCause());
}
if (!completed) {
+ logger.debug("Process did not complete in allotted time, attempting to forcibly destroy process...");
+ try {
+ proc.destroyForcibly();
+ } catch (Exception e) {
+ logger.debug("Process failed to destroy: " + e.getMessage(), e);
+ }
throw new IllegalStateException("Shell command '" + command + "' did not complete during the allotted time period");
}
if (proc.exitValue() != 0) {
- try (final Reader stderr = new InputStreamReader(proc.getErrorStream());
- final BufferedReader reader = new BufferedReader(stderr)) {
- String line;
- while ((line = reader.readLine()) != null) {
- logger.warn(line.trim());
- }
- }
- throw new IOException("Command exit non-zero: " + proc.exitValue());
+ throw new IOException("Process exited with non-zero value: " + proc.exitValue());
}
- final List<String> lines = new ArrayList<>();
- try (final Reader stdin = new InputStreamReader(proc.getInputStream());
- final BufferedReader reader = new BufferedReader(stdin)) {
- String line;
- while ((line = reader.readLine()) != null) {
- lines.add(line.trim());
+ return lines;
+ }
+
+ public void shutdown() {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
+ logger.info("Failed to stop ShellRunner executor in 5 seconds. Terminating");
+ executor.shutdownNow();
}
+ } catch (InterruptedException ie) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
}
-
- return lines;
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java
index 58ee973..cc7f0c4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/test/java/org/apache/nifi/authorization/ShellUserGroupProviderIT.java
@@ -84,6 +84,8 @@ public class ShellUserGroupProviderIT {
private ShellUserGroupProvider localProvider;
private UserGroupProviderInitializationContext initContext;
+ private static ShellRunner shellRunner;
+
@ClassRule
static public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -95,10 +97,11 @@ public class ShellUserGroupProviderIT {
sshPrivKeyFile = tempFolder.getRoot().getAbsolutePath() + "/id_rsa";
sshPubKeyFile = sshPrivKeyFile + ".pub";
+ shellRunner = new ShellRunner(60);
try {
// NB: this command is a bit perplexing: it works without prompt from the shell, but hangs
// here without the pipe from `yes`:
- ShellRunner.runShell("yes | ssh-keygen -C '' -N '' -t rsa -f " + sshPrivKeyFile);
+ shellRunner.runShell("yes | ssh-keygen -C '' -N '' -t rsa -f " + sshPrivKeyFile);
} catch (final IOException ioexc) {
systemCheckFailed = true;
logger.error("setupOnce() exception: " + ioexc + "; tests cannot run on this system.");