You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2014/07/05 01:08:46 UTC
git commit: FLUME-2404. Make ScribeSource read buffer and max frame
size configurable
Repository: flume
Updated Branches:
refs/heads/trunk 9940dcbfe -> f15f20785
FLUME-2404. Make ScribeSource read buffer and max frame size configurable
Scribe default Thrift service maxReadBufferBytes and frame size varies
across Thrift versions. In some cases, these values are set to INT_MAX,
in other cases this is set to 16MB. To avoid OOM in certain cases and
incompatibilities in other cases, set the default to 16MB and also make
the parameters configurable.
(chenshangan and Marimuthu Ponnambalam via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f15f2078
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f15f2078
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f15f2078
Branch: refs/heads/trunk
Commit: f15f20785262ac3cb3e35c2a12e669b7a836d35f
Parents: 9940dcb
Author: Mike Percy <mp...@cloudera.com>
Authored: Fri Jul 4 15:41:43 2014 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Fri Jul 4 15:41:43 2014 -0700
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 15 ++++++++-------
.../org/apache/flume/source/scribe/ScribeSource.java | 14 +++++++++++---
2 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/f15f2078/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f0dd8e8..1e98725 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1484,15 +1484,16 @@ Flume should use ScribeSource based on Thrift with compatible transfering protoc
For deployment of Scribe please follow the guide from Facebook.
Required properties are in **bold**.
-============== =========== ==============================================
-Property Name Default Description
-============== =========== ==============================================
-**type** -- The component type name, needs to be ``org.apache.flume.source.scribe.ScribeSource``
-port 1499 Port that Scribe should be connected
-workerThreads 5 Handing threads number in Thrift
+==================== =========== ==============================================
+Property Name Default Description
+==================== =========== ==============================================
+**type** -- The component type name, needs to be ``org.apache.flume.source.scribe.ScribeSource``
+port 1499 Port that Scribe should be connected
+maxReadBufferBytes 16384000 Thrift Default FrameBuffer Size
+workerThreads 5 Handing threads number in Thrift
selector.type
selector.*
-============== =========== ==============================================
+==================== =========== ==============================================
Example for agent named a1:
http://git-wip-us.apache.org/repos/asf/flume/blob/f15f2078/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
index f9a14c1..1d7da09 100644
--- a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
@@ -59,17 +59,24 @@ public class ScribeSource extends AbstractSource implements
public static final String SCRIBE_CATEGORY = "category";
+ private static final int DEFAULT_PORT = 1499;
private static final int DEFAULT_WORKERS = 5;
+ private static final int DEFAULT_MAX_READ_BUFFER_BYTES = 16384000;
private TServer server;
- private int port = 1499;
+ private int port;
private int workers;
+ private int maxReadBufferBytes;
private SourceCounter sourceCounter;
@Override
public void configure(Context context) {
- port = context.getInteger("port", port);
+ port = context.getInteger("port", DEFAULT_PORT);
+ maxReadBufferBytes = context.getInteger("maxReadBufferBytes", DEFAULT_MAX_READ_BUFFER_BYTES);
+ if(maxReadBufferBytes <= 0){
+ maxReadBufferBytes = DEFAULT_MAX_READ_BUFFER_BYTES;
+ }
workers = context.getInteger("workerThreads", DEFAULT_WORKERS);
if (workers <= 0) {
@@ -91,8 +98,9 @@ public class ScribeSource extends AbstractSource implements
args.workerThreads(workers);
args.processor(processor);
- args.transportFactory(new TFramedTransport.Factory());
+ args.transportFactory(new TFramedTransport.Factory(maxReadBufferBytes));
args.protocolFactory(new TBinaryProtocol.Factory(false, false));
+ args.maxReadBufferBytes = maxReadBufferBytes;
server = new THsHaServer(args);