You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/07/11 08:08:59 UTC
svn commit: r1360016 - in /flume/trunk:
flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
flume-ng-doc/sphinx/FlumeUserGuide.rst
Author: hshreedharan
Date: Wed Jul 11 06:08:59 2012
New Revision: 1360016
URL: http://svn.apache.org/viewvc?rev=1360016&view=rev
Log:
FLUME-1361. Support batching of events in ExecSource.
(Juhani Connolly via Hari Shreedharan)
Modified:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
Modified: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1360016&r1=1360015&r2=1360016&view=diff
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Wed Jul 11 06:08:59 2012
@@ -22,6 +22,8 @@ package org.apache.flume.source;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.Source;
import org.apache.flume.channel.ChannelProcessor;
@@ -116,6 +119,12 @@ import com.google.common.base.Preconditi
* <td>Boolean</td>
* <td>false</td>
* </tr>
+ * <tr>
+ * <td><tt>batchSize</tt></td>
+ * <td>The number of events to commit to channel at a time.</td>
+ * <td>integer</td>
+ * <td>20</td>
+ * </tr>
* </table>
* <p>
* <b>Metrics</b>
@@ -138,6 +147,7 @@ Configurable {
private long restartThrottle;
private boolean restart;
private boolean logStderr;
+ private Integer bufferCount;
private ExecRunnable runner;
@Override
@@ -148,7 +158,7 @@ Configurable {
counterGroup = new CounterGroup();
runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
- restart, restartThrottle, logStderr);
+ restart, restartThrottle, logStderr, bufferCount);
// FIXME: Use a callback-like executor / future to signal us upon failure.
runnerFuture = executor.submit(runner);
@@ -210,17 +220,21 @@ Configurable {
logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
+
+ bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
+ ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
}
private static class ExecRunnable implements Runnable {
public ExecRunnable(String command, ChannelProcessor channelProcessor,
CounterGroup counterGroup, boolean restart, long restartThrottle,
- boolean logStderr) {
+ boolean logStderr, int bufferCount) {
this.command = command;
this.channelProcessor = channelProcessor;
this.counterGroup = counterGroup;
this.restartThrottle = restartThrottle;
+ this.bufferCount = bufferCount;
this.restart = restart;
this.logStderr = logStderr;
}
@@ -230,6 +244,7 @@ Configurable {
private CounterGroup counterGroup;
private volatile boolean restart;
private long restartThrottle;
+ private int bufferCount;
private boolean logStderr;
@Override
@@ -252,10 +267,17 @@ Configurable {
stderrReader.start();
String line = null;
-
+ List<Event> eventList = new ArrayList<Event>();
while ((line = reader.readLine()) != null) {
counterGroup.incrementAndGet("exec.lines.read");
- channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
+ eventList.add(EventBuilder.withBody(line.getBytes()));
+ if(eventList.size() >= bufferCount) {
+ channelProcessor.processEventBatch(eventList);
+ eventList.clear();
+ }
+ }
+ if(!eventList.isEmpty()) {
+ channelProcessor.processEventBatch(eventList);
}
} catch (Exception e) {
logger.error("Failed while running command: " + command, e);
Modified: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java?rev=1360016&r1=1360015&r2=1360016&view=diff
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java (original)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java Wed Jul 11 06:08:59 2012
@@ -38,4 +38,9 @@ public class ExecSourceConfigurationCons
public static final String CONFIG_LOG_STDERR = "logStdErr";
public static final boolean DEFAULT_LOG_STDERR = false;
+ /**
+ * Number of lines to read at a time
+ */
+ public static final String CONFIG_BATCH_SIZE = "batchSize";
+ public static final int DEFAULT_BATCH_SIZE = 20;
}
Modified: flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1360016&r1=1360015&r2=1360016&view=diff
==============================================================================
--- flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Wed Jul 11 06:08:59 2012
@@ -624,6 +624,7 @@ Property Name Default Descriptio
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command's stderr should be logged
+batchSize 20 The max number of lines to read and send to the channel at a time
selector.type replicating replicating or multiplexing
selector.* Depends on the selector.type value
interceptors -- Space separated list of interceptors
@@ -1363,6 +1364,10 @@ write-timeout 3
directories and cause the other channel initialization to fail.
It is therefore necessary that you provide explicit paths to
all the configured channels, preferably on different disks.
+ Furthermore, as file channel will sync to disk after every commit,
+ coupling it with a sink/source that batches events together may
+ be necessary to provide good performance where multiple disks are
+ not available for checkpoint and data directories.
Example for agent named **agent_foo**: