You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/22 06:38:03 UTC

svn commit: r1303659 - in /incubator/flume/trunk/flume-ng-core/src: main/java/org/apache/flume/source/ExecSource.java main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java test/java/org/apache/flume/source/TestExecSource.java

Author: arvind
Date: Thu Mar 22 05:38:03 2012
New Revision: 1303659

URL: http://svn.apache.org/viewvc?rev=1303659&view=rev
Log:
FLUME-979. ExecSource should optionally restart the command when it exits.

(Brock Noland via Arvind Prabhakar)

Added:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java   (with props)
Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1303659&r1=1303658&r2=1303659&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Thu Mar 22 05:38:03 2012
@@ -47,10 +47,10 @@ import com.google.common.base.Preconditi
  * </p>
  * <p>
  * This source runs a given Unix command on start up and expects that process to
- * continuously produce data on standard out (stderr is simply discarded). If
- * the process exits for any reason, the source also exits and will produce no
- * further data. This means configurations such as <tt>cat [named pipe]</tt> or
- * <tt>tail -F [file]</tt> are going to produce the desired results where as
+ * continuously produce data on standard out (stderr ignored by default). Unless
+ * told to restart, if the process exits for any reason, the source also exits and
+ * will produce no further data. This means configurations such as <tt>cat [named pipe]</tt>
+ * or <tt>tail -F [file]</tt> are going to produce the desired results where as
  * <tt>date</tt> will probably not - the former two commands produce streams of
  * data where as the latter produces a single event and exits.
  * </p>
@@ -98,6 +98,24 @@ import com.google.common.base.Preconditi
  * <td>String</td>
  * <td>none (required)</td>
  * </tr>
+ * <tr>
+ * <td><tt>restart</tt></td>
+ * <td>Whether to restart the command when it exits</td>
+ * <td>Boolean</td>
+ * <td>false</td>
+ * </tr>
+ * <tr>
+ * <td><tt>restartThrottle</tt></td>
+ * <td>How long in milliseconds to wait before restarting the command</td>
+ * <td>Long</td>
+ * <td>10000</td>
+ * </tr>
+ * <tr>
+ * <td><tt>logStderr</tt></td>
+ * <td>Whether to log or discard the standard error stream of the command</td>
+ * <td>Boolean</td>
+ * <td>false</td>
+ * </tr>
  * </table>
  * <p>
  * <b>Metrics</b>
@@ -107,16 +125,20 @@ import com.google.common.base.Preconditi
  * </p>
  */
 public class ExecSource extends AbstractSource implements EventDrivenSource,
-    Configurable {
+Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(ExecSource.class);
 
-  private String command;
 
+  private String command;
   private CounterGroup counterGroup;
   private ExecutorService executor;
   private Future<?> runnerFuture;
+  private long restartThrottle;
+  private boolean restart;
+  private boolean logStderr;
+  private ExecRunnable runner;
 
   @Override
   public void start() {
@@ -125,11 +147,8 @@ public class ExecSource extends Abstract
     executor = Executors.newSingleThreadExecutor();
     counterGroup = new CounterGroup();
 
-    ExecRunnable runner = new ExecRunnable();
-
-    runner.command = command;
-    runner.channelProcessor = getChannelProcessor();
-    runner.counterGroup = counterGroup;
+    runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
+        restart, restartThrottle, logStderr);
 
     // FIXME: Use a callback-like executor / future to signal us upon failure.
     runnerFuture = executor.submit(runner);
@@ -148,6 +167,9 @@ public class ExecSource extends Abstract
   public void stop() {
     logger.info("Stopping exec source with command:{}", command);
 
+    if(runner != null) {
+      runner.setRestart(false);
+    }
     if (runnerFuture != null) {
       logger.debug("Stopping exec runner");
       runnerFuture.cancel(true);
@@ -179,43 +201,127 @@ public class ExecSource extends Abstract
 
     Preconditions.checkState(command != null,
         "The parameter command must be specified");
+
+    restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
+        ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);
+
+    restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
+        ExecSourceConfigurationConstants.DEFAULT_RESTART);
+
+    logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
+        ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
   }
 
   private static class ExecRunnable implements Runnable {
 
+    public ExecRunnable(String command, ChannelProcessor channelProcessor,
+        CounterGroup counterGroup, boolean restart, long restartThrottle,
+        boolean logStderr) {
+      this.command = command;
+      this.channelProcessor = channelProcessor;
+      this.counterGroup = counterGroup;
+      this.restartThrottle = restartThrottle;
+      this.restart = restart;
+      this.logStderr = logStderr;
+    }
+
     private String command;
     private ChannelProcessor channelProcessor;
     private CounterGroup counterGroup;
+    private volatile boolean restart;
+    private long restartThrottle;
+    private boolean logStderr;
 
     @Override
     public void run() {
-      BufferedReader reader = null;
+      do {
+        String exitCode = "unknown";
+        BufferedReader reader = null;
+        Process process = null;
+        try {
+          String[] commandArgs = command.split("\\s+");
+          process = new ProcessBuilder(commandArgs).start();
+          reader = new BufferedReader(
+              new InputStreamReader(process.getInputStream()));
+
+          // StderrLogger dies as soon as the input stream is invalid
+          StderrReader stderrReader = new StderrReader(new BufferedReader(
+              new InputStreamReader(process.getErrorStream())), logStderr);
+          stderrReader.setName("StderrReader-[" + command + "]");
+          stderrReader.setDaemon(true);
+          stderrReader.start();
+
+          String line = null;
+
+          while ((line = reader.readLine()) != null) {
+            counterGroup.incrementAndGet("exec.lines.read");
+            channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
+          }
+        } catch (Exception e) {
+          logger.error("Failed while running command: " + command, e);
+          if(e instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+          }
+        } finally {
+          if (reader != null) {
+            try {
+              reader.close();
+            } catch (IOException ex) {
+              logger.error("Failed to close reader for exec source", ex);
+            }
+          }
+          if(process != null) {
+            process.destroy();
+            try {
+              exitCode = String.valueOf(process.waitFor());
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+            }
+          }
+        }
+        if(restart) {
+          logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode);
+          try {
+            Thread.sleep(restartThrottle);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      } while(restart);
+    }
+    public void setRestart(boolean restart) {
+      this.restart = restart;
+    }
+  }
+  private static class StderrReader extends Thread {
+    private BufferedReader input;
+    private boolean logStderr;
+    protected StderrReader(BufferedReader input, boolean logStderr) {
+      this.input = input;
+      this.logStderr = logStderr;
+    }
+    @Override
+    public void run() {
       try {
-        String[] commandArgs = command.split("\\s+");
-        Process process = new ProcessBuilder(commandArgs).start();
-        reader = new BufferedReader(
-            new InputStreamReader(process.getInputStream()));
-
+        int i = 0;
         String line = null;
-
-        while ((line = reader.readLine()) != null) {
-          counterGroup.incrementAndGet("exec.lines.read");
-          channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
+        while((line = input.readLine()) != null) {
+          if(logStderr) {
+            logger.info("StderrLogger[{}] = '{}'", ++i, line);
+          }
         }
-
-      } catch (Exception e) {
-        logger.error("Failed while running command:" + command
-                      + " - Exception follows.", e);
+      } catch (IOException e) {
+        logger.info("StderrLogger exiting", e);
       } finally {
-        if (reader != null) {
-          try {
-            reader.close();
-          } catch (IOException ex) {
-            logger.error("Failed to close reader for exec source", ex);
+        try {
+          if(input != null) {
+            input.close();
           }
+        } catch (IOException ex) {
+          logger.error("Failed to close stderr reader for exec source", ex);
         }
       }
     }
-  }
 
+  }
 }

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java?rev=1303659&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java Thu Mar 22 05:38:03 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+public class ExecSourceConfigurationConstants {
+
+  /**
+   * Should the exec'ed command restarted if it dies: : default false
+   */
+  public static final String CONFIG_RESTART = "restart";
+  public static final boolean DEFAULT_RESTART = false;
+
+  /**
+   * Amount of time to wait before attempting a restart: : default 10000 ms
+   */
+  public static final String CONFIG_RESTART_THROTTLE = "restartThrottle";
+  public static final long DEFAULT_RESTART_THROTTLE = 10000L;
+
+  /**
+   * Should stderr from the command be logged: default false
+   */
+  public static final String CONFIG_LOG_STDERR = "logStdErr";
+  public static final boolean DEFAULT_LOG_STDERR = false;
+
+}

Propchange: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1303659&r1=1303658&r2=1303659&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Thu Mar 22 05:38:03 2012
@@ -19,11 +19,12 @@
 
 package org.apache.flume.source;
 
+
+import static org.junit.Assert.*;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
@@ -41,6 +42,9 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
 public class TestExecSource {
 
   private AbstractSource source;
@@ -52,7 +56,7 @@ public class TestExecSource {
 
   @Test
   public void testProcess() throws InterruptedException, LifecycleException,
-      EventDeliveryException, IOException {
+  EventDeliveryException, IOException {
 
     Channel channel = new MemoryChannel();
     Context context = new Context();
@@ -64,11 +68,8 @@ public class TestExecSource {
     Configurables.configure(source, context);
     Configurables.configure(channel, context);
 
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
-
     ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
+    rcs.setChannels(Lists.newArrayList(channel));
 
     source.setChannelProcessor(new ChannelProcessor(rcs));
 
@@ -100,4 +101,46 @@ public class TestExecSource {
     FileUtils.forceDelete(file1);
   }
 
+
+  @Test
+  public void testRestart() throws InterruptedException, LifecycleException,
+  EventDeliveryException, IOException {
+
+    Channel channel = new MemoryChannel();
+    Context context = new Context();
+
+    context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10");
+    context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true");
+
+    context.put("command", "echo flume");
+    Configurables.configure(source, context);
+    Configurables.configure(channel, context);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(Lists.newArrayList(channel));
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    source.start();
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+
+    long start = System.currentTimeMillis();
+
+    for(int i = 0; i < 5; i++) {
+      Event event = channel.take();
+      assertNotNull(event);
+      assertNotNull(event.getBody());
+      assertEquals("flume", new String(event.getBody(), Charsets.UTF_8));
+    }
+
+    // ensure restartThrottle was turned down as expected
+    assertTrue(System.currentTimeMillis() - start < 10000L);
+
+    transaction.commit();
+    transaction.close();
+
+    source.stop();
+  }
 }