You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2014/07/05 01:08:46 UTC

git commit: FLUME-2404. Make ScribeSource read buffer and max frame size configurable

Repository: flume
Updated Branches:
  refs/heads/trunk 9940dcbfe -> f15f20785


FLUME-2404. Make ScribeSource read buffer and max frame size configurable

Scribe default Thrift service maxReadBufferBytes and frame size varies
across Thrift versions. In some cases, these values are set to INT_MAX,
in other cases this is set to 16MB. To avoid OOM in certain cases and
incompatibilities in other cases, set the default to 16MB and also make
the parameters configurable.

(chenshangan and Marimuthu Ponnambalam via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f15f2078
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f15f2078
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f15f2078

Branch: refs/heads/trunk
Commit: f15f20785262ac3cb3e35c2a12e669b7a836d35f
Parents: 9940dcb
Author: Mike Percy <mp...@cloudera.com>
Authored: Fri Jul 4 15:41:43 2014 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Fri Jul 4 15:41:43 2014 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst               | 15 ++++++++-------
 .../org/apache/flume/source/scribe/ScribeSource.java | 14 +++++++++++---
 2 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f15f2078/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f0dd8e8..1e98725 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1484,15 +1484,16 @@ Flume should use ScribeSource based on Thrift with compatible transfering protoc
 For deployment of Scribe please follow the guide from Facebook.
 Required properties are in **bold**.
 
-==============  ===========  ==============================================
-Property Name   Default      Description
-==============  ===========  ==============================================
-**type**        --           The component type name, needs to be ``org.apache.flume.source.scribe.ScribeSource``
-port            1499         Port that Scribe should be connected
-workerThreads   5            Handing threads number in Thrift
+====================  ===========  ==============================================
+Property Name         Default      Description
+====================  ===========  ==============================================
+**type**              --           The component type name, needs to be ``org.apache.flume.source.scribe.ScribeSource``
+port                  1499         Port that Scribe should be connected
+maxReadBufferBytes    16384000     Thrift Default FrameBuffer Size
+workerThreads         5            Handing threads number in Thrift
 selector.type
 selector.*
-==============  ===========  ==============================================
+====================  ===========  ==============================================
 
 Example for agent named a1:
 

http://git-wip-us.apache.org/repos/asf/flume/blob/f15f2078/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
index f9a14c1..1d7da09 100644
--- a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
@@ -59,17 +59,24 @@ public class ScribeSource extends AbstractSource implements
 
   public static final String SCRIBE_CATEGORY = "category";
 
+  private static final int DEFAULT_PORT = 1499;
   private static final int DEFAULT_WORKERS = 5;
+  private static final int DEFAULT_MAX_READ_BUFFER_BYTES = 16384000;
 
   private TServer server;
-  private int port = 1499;
+  private int port;
   private int workers;
+  private int maxReadBufferBytes;
 
   private SourceCounter sourceCounter;
 
   @Override
   public void configure(Context context) {
-    port = context.getInteger("port", port);
+    port = context.getInteger("port", DEFAULT_PORT);
+    maxReadBufferBytes = context.getInteger("maxReadBufferBytes", DEFAULT_MAX_READ_BUFFER_BYTES);
+    if(maxReadBufferBytes <= 0){
+      maxReadBufferBytes = DEFAULT_MAX_READ_BUFFER_BYTES;
+    }
 
     workers = context.getInteger("workerThreads", DEFAULT_WORKERS);
     if (workers <= 0) {
@@ -91,8 +98,9 @@ public class ScribeSource extends AbstractSource implements
 
         args.workerThreads(workers);
         args.processor(processor);
-        args.transportFactory(new TFramedTransport.Factory());
+        args.transportFactory(new TFramedTransport.Factory(maxReadBufferBytes));
         args.protocolFactory(new TBinaryProtocol.Factory(false, false));
+        args.maxReadBufferBytes = maxReadBufferBytes;
 
         server = new THsHaServer(args);