You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/03/11 20:16:55 UTC

git commit: FLUME-1661: ExecSource cannot execute complex *nix commands

Updated Branches:
  refs/heads/trunk b28b87b58 -> 13b8252bd


FLUME-1661: ExecSource cannot execute complex *nix commands

(Roshan Naik via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/13b8252b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/13b8252b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/13b8252b

Branch: refs/heads/trunk
Commit: 13b8252bdeb838c606f4453bdf757fb2a1101eb8
Parents: b28b87b
Author: Brock Noland <br...@apache.org>
Authored: Mon Mar 11 14:16:55 2013 -0500
Committer: Brock Noland <br...@apache.org>
Committed: Mon Mar 11 14:16:55 2013 -0500

----------------------------------------------------------------------
 .../java/org/apache/flume/source/ExecSource.java   |   44 +++--
 .../source/ExecSourceConfigurationConstants.java   |    5 +
 .../org/apache/flume/source/TestExecSource.java    |  150 +++++++++++----
 flume-ng-core/src/test/resources/test_command.txt  |    3 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   12 ++
 pom.xml                                            |    1 +
 6 files changed, 165 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 495b03f..8e687f2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -140,7 +140,7 @@ Configurable {
   private static final Logger logger = LoggerFactory
       .getLogger(ExecSource.class);
 
-
+  private String shell;
   private String command;
   private CounterGroup counterGroup;
   private ExecutorService executor;
@@ -159,7 +159,7 @@ Configurable {
     executor = Executors.newSingleThreadExecutor();
     counterGroup = new CounterGroup();
 
-    runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
+    runner = new ExecRunnable(shell, command, getChannelProcessor(), counterGroup,
         restart, restartThrottle, logStderr, bufferCount, charset);
 
     // FIXME: Use a callback-like executor / future to signal us upon failure.
@@ -229,11 +229,13 @@ Configurable {
 
     charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
         ExecSourceConfigurationConstants.DEFAULT_CHARSET));
+
+    shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
   }
 
   private static class ExecRunnable implements Runnable {
 
-    public ExecRunnable(String command, ChannelProcessor channelProcessor,
+    public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
         CounterGroup counterGroup, boolean restart, long restartThrottle,
         boolean logStderr, int bufferCount, Charset charset) {
       this.command = command;
@@ -244,16 +246,18 @@ Configurable {
       this.restart = restart;
       this.logStderr = logStderr;
       this.charset = charset;
+      this.shell = shell;
     }
 
-    private String command;
-    private ChannelProcessor channelProcessor;
-    private CounterGroup counterGroup;
+    private final String shell;
+    private final String command;
+    private final ChannelProcessor channelProcessor;
+    private final CounterGroup counterGroup;
     private volatile boolean restart;
-    private long restartThrottle;
-    private int bufferCount;
-    private boolean logStderr;
-    private Charset charset;
+    private final long restartThrottle;
+    private final int bufferCount;
+    private final boolean logStderr;
+    private final Charset charset;
     private Process process = null;
 
     @Override
@@ -262,8 +266,13 @@ Configurable {
         String exitCode = "unknown";
         BufferedReader reader = null;
         try {
-          String[] commandArgs = command.split("\\s+");
-          process = new ProcessBuilder(commandArgs).start();
+          if(shell != null) {
+            String[] commandArgs = formulateShellCommand(shell, command);
+            process = Runtime.getRuntime().exec(commandArgs);
+          }  else {
+            String[] commandArgs = command.split("\\s+");
+            process = new ProcessBuilder(commandArgs).start();
+          }
           reader = new BufferedReader(
               new InputStreamReader(process.getInputStream(), charset));
 
@@ -315,6 +324,15 @@ Configurable {
         }
       } while(restart);
     }
+
+    private static String[] formulateShellCommand(String shell, String command) {
+      String[] shellArgs = shell.split("\\s+");
+      String[] result = new String[shellArgs.length + 1];
+      System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
+      result[shellArgs.length] = command;
+      return result;
+    }
+
     public int kill() {
       if(process != null) {
         synchronized (process) {
@@ -336,10 +354,12 @@ Configurable {
   private static class StderrReader extends Thread {
     private BufferedReader input;
     private boolean logStderr;
+
     protected StderrReader(BufferedReader input, boolean logStderr) {
       this.input = input;
       this.logStderr = logStderr;
     }
+
     @Override
     public void run() {
       try {

http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
index 1b35b01..fd5a60b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
@@ -51,4 +51,9 @@ public class ExecSourceConfigurationConstants {
    */
   public static final String CHARSET = "charset";
   public static final String DEFAULT_CHARSET = "UTF-8";
+
+  /**
+   * Optional shell/command processor used to run command
+   */
+  public static final String CONFIG_SHELL = "shell";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
index 8bcf320..7c573f6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
@@ -22,13 +22,9 @@ package org.apache.flume.source;
 
 import static org.junit.Assert.*;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Random;
+import java.io.*;
+import java.nio.charset.Charset;
+import java.util.*;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
@@ -43,40 +39,48 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.lifecycle.LifecycleException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 
 public class TestExecSource {
 
   private AbstractSource source;
+  private Channel channel = new MemoryChannel();
+
+  private Context context = new Context();
+
+  private ChannelSelector rcs = new ReplicatingChannelSelector();
+
 
   @Before
   public void setUp() {
+    context.put("keep-alive", "1");
+    context.put("capacity", "1000");
+    context.put("transactionCapacity", "1000");
+    Configurables.configure(channel, context);
+    rcs.setChannels(Lists.newArrayList(channel));
+
     source = new ExecSource();
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+  }
+
+  @After
+  public void tearDown() {
+    source.stop();
   }
 
   @Test
   public void testProcess() throws InterruptedException, LifecycleException,
   EventDeliveryException, IOException {
 
-    Channel channel = new MemoryChannel();
-    Context context = new Context();
-
     context.put("command", "cat /etc/passwd");
     context.put("keep-alive", "1");
     context.put("capacity", "1000");
     context.put("transactionCapacity", "1000");
     Configurables.configure(source, context);
-    Configurables.configure(channel, context);
-
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(Lists.newArrayList(channel));
-
-    source.setChannelProcessor(new ChannelProcessor(rcs));
 
     source.start();
     Transaction transaction = channel.getTransaction();
@@ -96,8 +100,6 @@ public class TestExecSource {
     transaction.commit();
     transaction.close();
 
-    source.stop();
-
     File file1 = new File("/tmp/flume-execsource."
         + Thread.currentThread().getId());
     File file2 = new File("/etc/passwd");
@@ -106,25 +108,105 @@ public class TestExecSource {
     FileUtils.forceDelete(file1);
   }
 
+  @Test
+  public void testShellCommandSimple() throws InterruptedException, LifecycleException,
+  EventDeliveryException, IOException {
+    runTestShellCmdHelper("/bin/sh -c", "seq 5"
+            , new String[]{"1","2","3","4","5" } );
+  }
 
   @Test
-  public void testRestart() throws InterruptedException, LifecycleException,
+  public void testShellCommandBackTicks() throws InterruptedException, LifecycleException,
   EventDeliveryException, IOException {
+    // command with backticks
+    runTestShellCmdHelper("/bin/sh -c", "echo `seq 5`" , new String[]{"1 2 3 4 5" } );
+    runTestShellCmdHelper("/bin/sh -c", "echo $(seq 5)" , new String[]{"1 2 3 4 5" } );
+  }
 
-    Channel channel = new MemoryChannel();
-    Context context = new Context();
+  @Test
+  public void testShellCommandComplex() throws InterruptedException, LifecycleException,
+  EventDeliveryException, IOException {
+    // command with wildcards & pipes
+    String[] expected = {"1234", "abcd", "ijk", "xyz", "zzz"};
+
+    // pipes
+    runTestShellCmdHelper("/bin/sh -c", "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f"
+            ,  expected );
+  }
+
+  @Test
+  public void testShellCommandScript() throws InterruptedException, LifecycleException,
+  EventDeliveryException, IOException {
+    // mini script
+    runTestShellCmdHelper("/bin/sh -c", "for i in {1..5}; do echo $i;done"
+            , new String[]{"1","2","3","4","5" } );
+    // shell arithmetic
+    runTestShellCmdHelper("/bin/sh -c", "if ((2+2>3)); then  echo good; else echo not good; fi" , new String[]{"good"} );
+  }
+
+  @Test
+  public void testShellCommandEmbeddingAndEscaping() throws InterruptedException, LifecycleException,
+    EventDeliveryException, IOException {
+      System.out.println( "######### PWD = " + new java.io.File( "." ).getCanonicalPath() );
+    // mini script
+      BufferedReader reader = new BufferedReader(new FileReader("src/test/resources/test_command.txt") );
+      try {
+        String command1 = reader.readLine();
+        Assert.assertNotNull(command1);
+        String[] output1 = new String[] {"'1'", "\"2\"", "\\3", "\\4"};
+        runTestShellCmdHelper("/bin/sh -c", command1 , output1);
+        String command2 = reader.readLine();
+        Assert.assertNotNull(command2);
+        String[] output2 = new String[]{"1","2","3","4","5" };
+        runTestShellCmdHelper("/bin/sh -c", command2 , output2);
+        String command3 = reader.readLine();
+        Assert.assertNotNull(command3);
+        String[] output3 = new String[]{"2","3","4","5","6" };
+        runTestShellCmdHelper("/bin/sh -c", command3 , output3);
+      } finally {
+        reader.close();
+      }
+    }
+
+
+    private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput)
+             throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+      context.put("shell", shell);
+      context.put("command", command);
+      Configurables.configure(source, context);
+      source.start();
+      File outputFile = File.createTempFile("flumeExecSourceTest_", "");
+      FileOutputStream outputStream = new FileOutputStream(outputFile);
+      Transaction transaction = channel.getTransaction();
+      transaction.begin();
+      try {
+        Event event;
+        while ((event = channel.take()) != null) {
+          outputStream.write(event.getBody());
+          outputStream.write('\n');
+        }
+        outputStream.close();
+        transaction.commit();
+        List<String> output  = Files.readLines(outputFile, Charset.defaultCharset());
+
+        Assert.assertArrayEquals(expectedOutput, output.toArray(new String[]{}));
+      } finally {
+        FileUtils.forceDelete(outputFile);
+        transaction.close();
+        source.stop();
+      }
+    }
+
+
+  @Test
+  public void testRestart() throws InterruptedException, LifecycleException,
+  EventDeliveryException, IOException {
 
     context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10");
     context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true");
 
     context.put("command", "echo flume");
     Configurables.configure(source, context);
-    Configurables.configure(channel, context);
-
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(Lists.newArrayList(channel));
-
-    source.setChannelProcessor(new ChannelProcessor(rcs));
 
     source.start();
     Transaction transaction = channel.getTransaction();
@@ -182,19 +264,11 @@ public class TestExecSource {
     String command = "sleep " + seconds;
     Pattern pattern = Pattern.compile("\b" + command + "\b");
 
-    Channel channel = new MemoryChannel();
-    Context context = new Context();
-
     context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "false");
 
     context.put("command", command);
     Configurables.configure(source, context);
-    Configurables.configure(channel, context);
 
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(Lists.newArrayList(channel));
-
-    source.setChannelProcessor(new ChannelProcessor(rcs));
     source.start();
     Thread.sleep(1000L);
     source.stop();

http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/test/resources/test_command.txt
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/resources/test_command.txt b/flume-ng-core/src/test/resources/test_command.txt
new file mode 100644
index 0000000..81114c2
--- /dev/null
+++ b/flume-ng-core/src/test/resources/test_command.txt
@@ -0,0 +1,3 @@
+echo "'1'"; echo "\"2\""; echo "\\3"; echo "\4"
+for i in {1..5}; do echo $i; done
+for i in `seq 5`; do echo $i; done | awk ' { print $1 + 1 } '

http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f9088f9..d72c965 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -712,6 +712,7 @@ Property Name    Default      Description
 **channels**     --
 **type**         --           The component type name, needs to be ``exec``
 **command**      --           The command to execute
+shell            --           A shell invocation used to run the command.  e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
 restartThrottle  10000        Amount of time (in millis) to wait before attempting a restart
 restart          false        Whether the executed cmd should be restarted if it dies
 logStdErr        false        Whether the command's stderr should be logged
@@ -755,6 +756,17 @@ Example for agent named a1:
   a1.sources.r1.command = tail -F /var/log/secure
   a1.sources.r1.channels = c1
 
+The 'shell' config is used to invoke the 'command' through a command shell (such as Bash
+or Powershell). The 'command' is passed as argument to 'shell' for execution. This
+allows the 'command' to use features from the shell such as wildcards, back ticks, pipes,
+loops, conditionals etc. In the absence of the 'shell' config, the 'command' will be
+invoked directly.  Common values for 'shell' :  '/bin/sh -c', '/bin/ksh -c',
+'cmd /c',  'powershell -Command', etc.
+.. code-block:: properties
+  agent_foo.sources.tailsource-1.type = exec
+  agent_foo.sources.tailsource-1.shell = /bin/bash -c
+  agent_foo.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
+
 JMS Source
 ~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b1f8af2..00a7e61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -534,6 +534,7 @@ limitations under the License.
                   <exclude>**/*.avsc</exclude>
                   <exclude>**/*.avro</exclude>
                   <exclude>**/docs/**</exclude>
+                  <exclude>**/test/resources/test_command.txt</exclude>
                 </excludes>
               </configuration>
             </execution>