You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/07/11 08:08:59 UTC

svn commit: r1360016 - in /flume/trunk: flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java flume-ng-doc/sphinx/FlumeUserGuide.rst

Author: hshreedharan
Date: Wed Jul 11 06:08:59 2012
New Revision: 1360016

URL: http://svn.apache.org/viewvc?rev=1360016&view=rev
Log:
FLUME-1361. Support batching of events in ExecSource.

(Juhani Connolly via Hari Shreedharan)

Modified:
    flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
    flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
    flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst

Modified: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1360016&r1=1360015&r2=1360016&view=diff
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Wed Jul 11 06:08:59 2012
@@ -22,6 +22,8 @@ package org.apache.flume.source;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.Source;
 import org.apache.flume.channel.ChannelProcessor;
@@ -116,6 +119,12 @@ import com.google.common.base.Preconditi
  * <td>Boolean</td>
  * <td>false</td>
  * </tr>
+ * <tr>
+ * <td><tt>batchSize</tt></td>
+ * <td>The number of events to commit to channel at a time.</td>
+ * <td>integer</td>
+ * <td>20</td>
+ * </tr>
  * </table>
  * <p>
  * <b>Metrics</b>
@@ -138,6 +147,7 @@ Configurable {
   private long restartThrottle;
   private boolean restart;
   private boolean logStderr;
+  private Integer bufferCount;
   private ExecRunnable runner;
 
   @Override
@@ -148,7 +158,7 @@ Configurable {
     counterGroup = new CounterGroup();
 
     runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
-        restart, restartThrottle, logStderr);
+        restart, restartThrottle, logStderr, bufferCount);
 
     // FIXME: Use a callback-like executor / future to signal us upon failure.
     runnerFuture = executor.submit(runner);
@@ -210,17 +220,21 @@ Configurable {
 
     logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
         ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
+
+    bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
+        ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
   }
 
   private static class ExecRunnable implements Runnable {
 
     public ExecRunnable(String command, ChannelProcessor channelProcessor,
         CounterGroup counterGroup, boolean restart, long restartThrottle,
-        boolean logStderr) {
+        boolean logStderr, int bufferCount) {
       this.command = command;
       this.channelProcessor = channelProcessor;
       this.counterGroup = counterGroup;
       this.restartThrottle = restartThrottle;
+      this.bufferCount = bufferCount;
       this.restart = restart;
       this.logStderr = logStderr;
     }
@@ -230,6 +244,7 @@ Configurable {
     private CounterGroup counterGroup;
     private volatile boolean restart;
     private long restartThrottle;
+    private int bufferCount;
     private boolean logStderr;
 
     @Override
@@ -252,10 +267,17 @@ Configurable {
           stderrReader.start();
 
           String line = null;
-
+          List<Event> eventList = new ArrayList<Event>();
           while ((line = reader.readLine()) != null) {
             counterGroup.incrementAndGet("exec.lines.read");
-            channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
+            eventList.add(EventBuilder.withBody(line.getBytes()));
+            if(eventList.size() >= bufferCount) {
+              channelProcessor.processEventBatch(eventList);
+              eventList.clear();
+            }
+          }
+          if(!eventList.isEmpty()) {
+            channelProcessor.processEventBatch(eventList);
           }
         } catch (Exception e) {
           logger.error("Failed while running command: " + command, e);

Modified: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java?rev=1360016&r1=1360015&r2=1360016&view=diff
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java (original)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java Wed Jul 11 06:08:59 2012
@@ -38,4 +38,9 @@ public class ExecSourceConfigurationCons
   public static final String CONFIG_LOG_STDERR = "logStdErr";
   public static final boolean DEFAULT_LOG_STDERR = false;
 
+  /**
+   * Number of lines to read at a time
+   */
+  public static final String CONFIG_BATCH_SIZE = "batchSize";
+  public static final int DEFAULT_BATCH_SIZE = 20;
 }

Modified: flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1360016&r1=1360015&r2=1360016&view=diff
==============================================================================
--- flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Wed Jul 11 06:08:59 2012
@@ -624,6 +624,7 @@ Property Name    Default      Descriptio
 restartThrottle  10000        Amount of time (in millis) to wait before attempting a restart
 restart          false        Whether the executed cmd should be restarted if it dies
 logStdErr        false        Whether the command's stderr should be logged
+batchSize        20           The max number of lines to read and send to the channel at a time
 selector.type    replicating  replicating or multiplexing
 selector.*                    Depends on the selector.type value
 interceptors     --           Space separated list of interceptors
@@ -1363,6 +1364,10 @@ write-timeout         3                 
           directories and cause the other channel initialization to fail.
           It is therefore necessary that you provide explicit paths to
           all the configured channels, preferably on different disks.
+          Furthermore, as file channel will sync to disk after every commit,
+          coupling it with a sink/source that batches events together may
+          be necessary to provide good performance where multiple disks are
+          not available for checkpoint and data directories.
 
 Example for agent named **agent_foo**: