You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/03/11 20:16:55 UTC
git commit: FLUME-1661: ExecSource cannot execute complex *nix
commands
Updated Branches:
refs/heads/trunk b28b87b58 -> 13b8252bd
FLUME-1661: ExecSource cannot execute complex *nix commands
(Roshan Naik via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/13b8252b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/13b8252b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/13b8252b
Branch: refs/heads/trunk
Commit: 13b8252bdeb838c606f4453bdf757fb2a1101eb8
Parents: b28b87b
Author: Brock Noland <br...@apache.org>
Authored: Mon Mar 11 14:16:55 2013 -0500
Committer: Brock Noland <br...@apache.org>
Committed: Mon Mar 11 14:16:55 2013 -0500
----------------------------------------------------------------------
.../java/org/apache/flume/source/ExecSource.java | 44 +++--
.../source/ExecSourceConfigurationConstants.java | 5 +
.../org/apache/flume/source/TestExecSource.java | 150 +++++++++++----
flume-ng-core/src/test/resources/test_command.txt | 3 +
flume-ng-doc/sphinx/FlumeUserGuide.rst | 12 ++
pom.xml | 1 +
6 files changed, 165 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 495b03f..8e687f2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -140,7 +140,7 @@ Configurable {
private static final Logger logger = LoggerFactory
.getLogger(ExecSource.class);
-
+ private String shell;
private String command;
private CounterGroup counterGroup;
private ExecutorService executor;
@@ -159,7 +159,7 @@ Configurable {
executor = Executors.newSingleThreadExecutor();
counterGroup = new CounterGroup();
- runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
+ runner = new ExecRunnable(shell, command, getChannelProcessor(), counterGroup,
restart, restartThrottle, logStderr, bufferCount, charset);
// FIXME: Use a callback-like executor / future to signal us upon failure.
@@ -229,11 +229,13 @@ Configurable {
charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
ExecSourceConfigurationConstants.DEFAULT_CHARSET));
+
+ shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
}
private static class ExecRunnable implements Runnable {
- public ExecRunnable(String command, ChannelProcessor channelProcessor,
+ public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
CounterGroup counterGroup, boolean restart, long restartThrottle,
boolean logStderr, int bufferCount, Charset charset) {
this.command = command;
@@ -244,16 +246,18 @@ Configurable {
this.restart = restart;
this.logStderr = logStderr;
this.charset = charset;
+ this.shell = shell;
}
- private String command;
- private ChannelProcessor channelProcessor;
- private CounterGroup counterGroup;
+ private final String shell;
+ private final String command;
+ private final ChannelProcessor channelProcessor;
+ private final CounterGroup counterGroup;
private volatile boolean restart;
- private long restartThrottle;
- private int bufferCount;
- private boolean logStderr;
- private Charset charset;
+ private final long restartThrottle;
+ private final int bufferCount;
+ private final boolean logStderr;
+ private final Charset charset;
private Process process = null;
@Override
@@ -262,8 +266,13 @@ Configurable {
String exitCode = "unknown";
BufferedReader reader = null;
try {
- String[] commandArgs = command.split("\\s+");
- process = new ProcessBuilder(commandArgs).start();
+ if(shell != null) {
+ String[] commandArgs = formulateShellCommand(shell, command);
+ process = Runtime.getRuntime().exec(commandArgs);
+ } else {
+ String[] commandArgs = command.split("\\s+");
+ process = new ProcessBuilder(commandArgs).start();
+ }
reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), charset));
@@ -315,6 +324,15 @@ Configurable {
}
} while(restart);
}
+
+ private static String[] formulateShellCommand(String shell, String command) {
+ String[] shellArgs = shell.split("\\s+");
+ String[] result = new String[shellArgs.length + 1];
+ System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
+ result[shellArgs.length] = command;
+ return result;
+ }
+
public int kill() {
if(process != null) {
synchronized (process) {
@@ -336,10 +354,12 @@ Configurable {
private static class StderrReader extends Thread {
private BufferedReader input;
private boolean logStderr;
+
protected StderrReader(BufferedReader input, boolean logStderr) {
this.input = input;
this.logStderr = logStderr;
}
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
index 1b35b01..fd5a60b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
@@ -51,4 +51,9 @@ public class ExecSourceConfigurationConstants {
*/
public static final String CHARSET = "charset";
public static final String DEFAULT_CHARSET = "UTF-8";
+
+ /**
+ * Optional shell/command processor used to run command
+ */
+ public static final String CONFIG_SHELL = "shell";
}
http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
index 8bcf320..7c573f6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
@@ -22,13 +22,9 @@ package org.apache.flume.source;
import static org.junit.Assert.*;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Random;
+import java.io.*;
+import java.nio.charset.Charset;
+import java.util.*;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
@@ -43,40 +39,48 @@ import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+import com.google.common.io.Files;
public class TestExecSource {
private AbstractSource source;
+ private Channel channel = new MemoryChannel();
+
+ private Context context = new Context();
+
+ private ChannelSelector rcs = new ReplicatingChannelSelector();
+
@Before
public void setUp() {
+ context.put("keep-alive", "1");
+ context.put("capacity", "1000");
+ context.put("transactionCapacity", "1000");
+ Configurables.configure(channel, context);
+ rcs.setChannels(Lists.newArrayList(channel));
+
source = new ExecSource();
+ source.setChannelProcessor(new ChannelProcessor(rcs));
+ }
+
+ @After
+ public void tearDown() {
+ source.stop();
}
@Test
public void testProcess() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
- Channel channel = new MemoryChannel();
- Context context = new Context();
-
context.put("command", "cat /etc/passwd");
context.put("keep-alive", "1");
context.put("capacity", "1000");
context.put("transactionCapacity", "1000");
Configurables.configure(source, context);
- Configurables.configure(channel, context);
-
- ChannelSelector rcs = new ReplicatingChannelSelector();
- rcs.setChannels(Lists.newArrayList(channel));
-
- source.setChannelProcessor(new ChannelProcessor(rcs));
source.start();
Transaction transaction = channel.getTransaction();
@@ -96,8 +100,6 @@ public class TestExecSource {
transaction.commit();
transaction.close();
- source.stop();
-
File file1 = new File("/tmp/flume-execsource."
+ Thread.currentThread().getId());
File file2 = new File("/etc/passwd");
@@ -106,25 +108,105 @@ public class TestExecSource {
FileUtils.forceDelete(file1);
}
+ @Test
+ public void testShellCommandSimple() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+ runTestShellCmdHelper("/bin/sh -c", "seq 5"
+ , new String[]{"1","2","3","4","5" } );
+ }
@Test
- public void testRestart() throws InterruptedException, LifecycleException,
+ public void testShellCommandBackTicks() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
+ // command with backticks
+ runTestShellCmdHelper("/bin/sh -c", "echo `seq 5`" , new String[]{"1 2 3 4 5" } );
+ runTestShellCmdHelper("/bin/sh -c", "echo $(seq 5)" , new String[]{"1 2 3 4 5" } );
+ }
- Channel channel = new MemoryChannel();
- Context context = new Context();
+ @Test
+ public void testShellCommandComplex() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+ // command with wildcards & pipes
+ String[] expected = {"1234", "abcd", "ijk", "xyz", "zzz"};
+
+ // pipes
+ runTestShellCmdHelper("/bin/sh -c", "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f"
+ , expected );
+ }
+
+ @Test
+ public void testShellCommandScript() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+ // mini script
+ runTestShellCmdHelper("/bin/sh -c", "for i in {1..5}; do echo $i;done"
+ , new String[]{"1","2","3","4","5" } );
+ // shell arithmetic
+ runTestShellCmdHelper("/bin/sh -c", "if ((2+2>3)); then echo good; else echo not good; fi" , new String[]{"good"} );
+ }
+
+ @Test
+ public void testShellCommandEmbeddingAndEscaping() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+ System.out.println( "######### PWD = " + new java.io.File( "." ).getCanonicalPath() );
+ // mini script
+ BufferedReader reader = new BufferedReader(new FileReader("src/test/resources/test_command.txt") );
+ try {
+ String command1 = reader.readLine();
+ Assert.assertNotNull(command1);
+ String[] output1 = new String[] {"'1'", "\"2\"", "\\3", "\\4"};
+ runTestShellCmdHelper("/bin/sh -c", command1 , output1);
+ String command2 = reader.readLine();
+ Assert.assertNotNull(command2);
+ String[] output2 = new String[]{"1","2","3","4","5" };
+ runTestShellCmdHelper("/bin/sh -c", command2 , output2);
+ String command3 = reader.readLine();
+ Assert.assertNotNull(command3);
+ String[] output3 = new String[]{"2","3","4","5","6" };
+ runTestShellCmdHelper("/bin/sh -c", command3 , output3);
+ } finally {
+ reader.close();
+ }
+ }
+
+
+ private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput)
+ throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+ context.put("shell", shell);
+ context.put("command", command);
+ Configurables.configure(source, context);
+ source.start();
+ File outputFile = File.createTempFile("flumeExecSourceTest_", "");
+ FileOutputStream outputStream = new FileOutputStream(outputFile);
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ try {
+ Event event;
+ while ((event = channel.take()) != null) {
+ outputStream.write(event.getBody());
+ outputStream.write('\n');
+ }
+ outputStream.close();
+ transaction.commit();
+ List<String> output = Files.readLines(outputFile, Charset.defaultCharset());
+
+ Assert.assertArrayEquals(expectedOutput, output.toArray(new String[]{}));
+ } finally {
+ FileUtils.forceDelete(outputFile);
+ transaction.close();
+ source.stop();
+ }
+ }
+
+
+ @Test
+ public void testRestart() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10");
context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true");
context.put("command", "echo flume");
Configurables.configure(source, context);
- Configurables.configure(channel, context);
-
- ChannelSelector rcs = new ReplicatingChannelSelector();
- rcs.setChannels(Lists.newArrayList(channel));
-
- source.setChannelProcessor(new ChannelProcessor(rcs));
source.start();
Transaction transaction = channel.getTransaction();
@@ -182,19 +264,11 @@ public class TestExecSource {
String command = "sleep " + seconds;
Pattern pattern = Pattern.compile("\b" + command + "\b");
- Channel channel = new MemoryChannel();
- Context context = new Context();
-
context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "false");
context.put("command", command);
Configurables.configure(source, context);
- Configurables.configure(channel, context);
- ChannelSelector rcs = new ReplicatingChannelSelector();
- rcs.setChannels(Lists.newArrayList(channel));
-
- source.setChannelProcessor(new ChannelProcessor(rcs));
source.start();
Thread.sleep(1000L);
source.stop();
http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/test/resources/test_command.txt
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/resources/test_command.txt b/flume-ng-core/src/test/resources/test_command.txt
new file mode 100644
index 0000000..81114c2
--- /dev/null
+++ b/flume-ng-core/src/test/resources/test_command.txt
@@ -0,0 +1,3 @@
+echo "'1'"; echo "\"2\""; echo "\\3"; echo "\4"
+for i in {1..5}; do echo $i; done
+for i in `seq 5`; do echo $i; done | awk ' { print $1 + 1 } '
http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f9088f9..d72c965 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -712,6 +712,7 @@ Property Name Default Description
**channels** --
**type** -- The component type name, needs to be ``exec``
**command** -- The command to execute
+shell -- A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command's stderr should be logged
@@ -755,6 +756,17 @@ Example for agent named a1:
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
+The 'shell' config is used to invoke the 'command' through a command shell (such as Bash
+or Powershell). The 'command' is passed as argument to 'shell' for execution. This
+allows the 'command' to use features from the shell such as wildcards, back ticks, pipes,
+loops, conditionals etc. In the absence of the 'shell' config, the 'command' will be
+invoked directly. Common values for 'shell' : '/bin/sh -c', '/bin/ksh -c',
+'cmd /c', 'powershell -Command', etc.
+.. code-block:: properties
+ agent_foo.sources.tailsource-1.type = exec
+ agent_foo.sources.tailsource-1.shell = /bin/bash -c
+ agent_foo.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
+
JMS Source
~~~~~~~~~~~
http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b1f8af2..00a7e61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -534,6 +534,7 @@ limitations under the License.
<exclude>**/*.avsc</exclude>
<exclude>**/*.avro</exclude>
<exclude>**/docs/**</exclude>
+ <exclude>**/test/resources/test_command.txt</exclude>
</excludes>
</configuration>
</execution>