You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2012/07/13 18:46:24 UTC

svn commit: r1361294 - /flume/site/trunk/content/sphinx/FlumeUserGuide.rst

Author: rgoers
Date: Fri Jul 13 16:46:24 2012
New Revision: 1361294

URL: http://svn.apache.org/viewvc?rev=1361294&view=rev
Log:
Bring the user guide up to date

Modified:
    flume/site/trunk/content/sphinx/FlumeUserGuide.rst

Modified: flume/site/trunk/content/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/FlumeUserGuide.rst?rev=1361294&r1=1361293&r2=1361294&view=diff
==============================================================================
--- flume/site/trunk/content/sphinx/FlumeUserGuide.rst (original)
+++ flume/site/trunk/content/sphinx/FlumeUserGuide.rst Fri Jul 13 16:46:24 2012
@@ -14,9 +14,9 @@
    limitations under the License.
 
 
-====================
-Flume 1.x User Guide
-====================
+======================================
+Flume 1.3.0-SNAPSHOT User Guide
+======================================
 
 Introduction
 ============
@@ -28,10 +28,11 @@ Apache Flume is a distributed, reliable,
 collecting, aggregating and moving large amounts of log data from many
 different sources to a centralized data store.
 
-At the moment Flume is an incubating Apache project. There are currently two
-release code lines available, version 0.9.x and 1.x.x. This guide is specific
-to 1.x (more specifically 1.1.0 release). Please click here for `the Flume
-0.9.x User Guide <http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
+Apache Flume is a top level project at the Apache Software Foundation.
+There are currently two release code lines available, versions 0.9.x and 1.x.
+This documentation applies to the 1.x codeline.  
+Please click here for
+`the Flume 0.9.x User Guide <http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
 
 System Requirements
 -------------------
@@ -578,15 +579,17 @@ When paired with the built-in AvroSink o
 it can create tiered collection topologies.
 Required properties are in **bold**.
 
-=============  ===========  ===================================================
-Property Name  Default      Description
-=============  ===========  ===================================================
-**channels**   --
-**type**       --           The component type name, needs to be ``avro``
-**bind**       --           hostname or IP address to listen on
-**port**       --           Port # to bind to
-threads        --           Maximum number of worker threads to spawn
-=============  ===========  ===================================================
+==============  ===========  ===================================================
+Property Name   Default      Description
+==============  ===========  ===================================================
+**channels**    --
+**type**        --           The component type name, needs to be ``avro``
+**bind**        --           hostname or IP address to listen on
+**port**        --           Port # to bind to
+threads         --           Maximum number of worker threads to spawn
+interceptors    --           Space separated list of interceptors
+interceptors.*
+==============  ===========  ===================================================
 
 Example for agent named **agent_foo**:
 
@@ -621,8 +624,11 @@ Property Name    Default      Descriptio
 restartThrottle  10000        Amount of time (in millis) to wait before attempting a restart
 restart          false        Whether the executed cmd should be restarted if it dies
 logStdErr        false        Whether the command's stderr should be logged
+batchSize        20           The max number of lines to read and send to the channel at a time
 selector.type    replicating  replicating or multiplexing
 selector.*                    Depends on the selector.type value
+interceptors     --           Space separated list of interceptors
+interceptors.*
 ===============  ===========  ==============================================================
 
 
@@ -677,6 +683,8 @@ Property Name    Default      Descriptio
 max-line-length  512          Max line length per event body (in bytes)
 selector.type    replicating  replicating or multiplexing
 selector.*                    Depends on the selector.type value
+interceptors     --           Space separated list of interceptors
+interceptors.*
 ===============  ===========  ===========================================
 
 Example for agent named **agent_foo**:
@@ -697,14 +705,16 @@ A simple sequence generator that continu
 that starts from 0 and increments by 1. Useful mainly for testing.
 Required properties are in **bold**.
 
-=============  ===========  ========================================
-Property Name  Default      Description
-=============  ===========  ========================================
-**channels**   --
-**type**       --           The component type name, needs to be ``seq``
-selector.type               replicating or multiplexing
-selector.*     replicating  Depends on the selector.type value
-=============  ===========  ========================================
+==============  ===========  ========================================
+Property Name   Default      Description
+==============  ===========  ========================================
+**channels**    --
+**type**        --           The component type name, needs to be ``seq``
+selector.type                replicating or multiplexing
+selector.*      replicating  Depends on the selector.type value
+interceptors    --           Space separated list of interceptors
+interceptors.*
+==============  ===========  ========================================
 
 Example for agent named **agent_foo**:
 
@@ -727,17 +737,19 @@ Required properties are in **bold**.
 Syslog TCP Source
 '''''''''''''''''
 
-=============  ===========  ==============================================
-Property Name  Default      Description
-=============  ===========  ==============================================
-**channels**   --
-**type**       --           The component type name, needs to be ``syslogtcp``
-**host**       --           Host name or IP address to bind to
-**port**       --           Port # to bind to
-eventSize      2500
-selector.type               replicating or multiplexing
-selector.*     replicating  Depends on the selector.type value
-=============  ===========  ==============================================
+==============   ===========  ==============================================
+Property Name    Default      Description
+==============   ===========  ==============================================
+**channels**     --
+**type**         --           The component type name, needs to be ``syslogtcp``
+**host**         --           Host name or IP address to bind to
+**port**         --           Port # to bind to
+eventSize        2500
+selector.type                 replicating or multiplexing
+selector.*       replicating  Depends on the selector.type value
+interceptors     --           Space separated list of interceptors
+interceptors.*
+==============   ===========  ==============================================
 
 
 For example, a syslog TCP source for agent named **agent_foo**:
@@ -754,16 +766,18 @@ For example, a syslog TCP source for age
 Syslog UDP Source
 '''''''''''''''''
 
-=============  ===========  ==============================================
-Property Name  Default      Description
-=============  ===========  ==============================================
-**channels**   --
-**type**       --           The component type name, needs to be ``syslogudp``
-**host**       --           Host name or IP address to bind to
-**port**       --           Port # to bind to
-selector.type               replicating or multiplexing
-selector.*     replicating  Depends on the selector.type value
-=============  ===========  ==============================================
+==============  ===========  ==============================================
+Property Name   Default      Description
+==============  ===========  ==============================================
+**channels**    --
+**type**        --           The component type name, needs to be ``syslogudp``
+**host**        --           Host name or IP address to bind to
+**port**        --           Port # to bind to
+selector.type                replicating or multiplexing
+selector.*      replicating  Depends on the selector.type value
+interceptors    --           Space separated list of interceptors
+interceptors.*
+==============  ===========  ==============================================
 
 
 For example, a syslog UDP source for agent named **agent_foo**:
@@ -803,16 +817,18 @@ Required properties are in **bold**.
 Avro Legacy Source
 ''''''''''''''''''
 
-=============  ===========  ========================================================================================
-Property Name  Default      Description
-=============  ===========  ========================================================================================
-**channels**   --
-**type**       --           The component type name, needs to be ``org.apache.flume.source.avroLegacy.AvroLegacySource``
-**host**       --           The hostname or IP address to bind to
-**port**       --           The port # to listen on
-selector.type               replicating or multiplexing
-selector.*     replicating  Depends on the selector.type value
-=============  ===========  ========================================================================================
+==============  ===========  ========================================================================================
+Property Name   Default      Description
+==============  ===========  ========================================================================================
+**channels**    --
+**type**        --           The component type name, needs to be ``org.apache.flume.source.avroLegacy.AvroLegacySource``
+**host**        --           The hostname or IP address to bind to
+**port**        --           The port # to listen on
+selector.type                replicating or multiplexing
+selector.*      replicating  Depends on the selector.type value
+interceptors    --           Space separated list of interceptors
+interceptors.*
+==============  ===========  ========================================================================================
 
 Example for agent named **agent_foo**:
 
@@ -828,16 +844,18 @@ Example for agent named **agent_foo**:
 Thrift Legacy Source
 ''''''''''''''''''''
 
-=============  ===========  ======================================================================================
-Property Name  Default      Description
-=============  ===========  ======================================================================================
-**channels**   --
-**type**       --           The component type name, needs to be ``org.apache.source.thriftLegacy.ThriftLegacySource``
-**host**       --           The hostname or IP address to bind to
-**port**       --           The port # to listen on
-selector.type               replicating or multiplexing
-selector.*     replicating  Depends on the selector.type value
-=============  ===========  ======================================================================================
+==============  ===========  ======================================================================================
+Property Name   Default      Description
+==============  ===========  ======================================================================================
+**channels**    --
+**type**        --           The component type name, needs to be ``org.apache.source.thriftLegacy.ThriftLegacySource``
+**host**        --           The hostname or IP address to bind to
+**port**        --           The port # to listen on
+selector.type                replicating or multiplexing
+selector.*      replicating  Depends on the selector.type value
+interceptors    --           Space separated list of interceptors
+interceptors.*
+==============  ===========  ======================================================================================
 
 Example for agent named **agent_foo**:
 
@@ -857,14 +875,16 @@ A custom source is your own implementati
 source's class and its dependencies must be included in the agent's classpath
 when starting the Flume agent. The type of the custom source is its FQCN.
 
-=============  ===========  ==============================================
-Property Name  Default      Description
-=============  ===========  ==============================================
-**channels**   --
-**type**       --           The component type name, needs to be your FQCN
-selector.type               replicating or multiplexing
-selector.*     replicating  Depends on the selector.type value
-=============  ===========  ==============================================
+==============  ===========  ==============================================
+Property Name   Default      Description
+==============  ===========  ==============================================
+**channels**    --
+**type**        --           The component type name, needs to be your FQCN
+selector.type                replicating or multiplexing
+selector.*      replicating  Depends on the selector.type value
+interceptors    --           Space separated list of interceptors
+interceptors.*
+==============  ===========  ==============================================
 
 Example for agent named **agent_foo**:
 
@@ -881,23 +901,24 @@ Flume Sinks
 HDFS Sink
 ~~~~~~~~~
 
-This sink writes the event into the Hadoop Distributed File System (HDFS). It
+This sink writes events into the Hadoop Distributed File System (HDFS). It
 currently supports creating text and sequence files. It supports compression in
 both file types. The files can be rolled (close current file and create a new
 one) periodically based on the elapsed time or size of data or number of events.
-It also bucketing/partitioning data by attributes like timestamp or machine
+It also buckets/partitions data by attributes like timestamp or machine
 where the event originated. The HDFS directory path may contain formatting
 escape sequences that will replaced by the HDFS sink to generate a
 directory/file name to store the events. Using this sink requires hadoop to be
 installed so that Flume can use the Hadoop jars to communicate with the HDFS
-cluster.
+cluster. Note that a version of Hadoop that supports the sync() call is
+required.
 
 The following are the escape sequences supported:
 
 =========  =================================================
 Alias      Description
 =========  =================================================
-%{host}    host name stored in event header
+%{host}    Substitute value of event header named "host". Arbitrary header names are supported.
 %t         Unix time in milliseconds
 %a         locale's short weekday name (Mon, Tue, ...)
 %A         locale's full weekday name (Monday, Tuesday, ...)
@@ -926,6 +947,10 @@ the file is closed, this extension is re
 complete files in the directory.
 Required properties are in **bold**.
 
+.. note:: For all of the time related escape sequences, a header with the key
+          "timestamp" must exist among the headers of the event. One way to add
+          this automatically is to use the TimestampInterceptor.
+
 ======================  ============  ======================================================================
 Name                    Default       Description
 ======================  ============  ======================================================================
@@ -952,6 +977,9 @@ hdfs.threadsPoolSize    10            Nu
 hdfs.rollTimerPoolSize  1             Number of threads per HDFS sink for scheduling timed file rolling
 hdfs.kerberosPrincipal  --            Kerberos user principal for accessing secure HDFS
 hdfs.kerberosKeytab     --            Kerberos keytab for accessing secure HDFS
+hdfs.round              false         Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
+hdfs.roundValue         1             Rounded down to the highest multiple of this (in the unit configured using ``hdfs.roundUnit``), less than current time.
+hdfs.roundUnit          second        The unit of the round down value - ``second``, ``minute`` or ``hour``.
 serializer              ``TEXT``      Other possible options include ``AVRO_EVENT`` or the
                                       fully-qualified class name of an implementation of the
                                       ``EventSerializer.Builder`` interface.
@@ -966,8 +994,13 @@ Example for agent named **agent_foo**:
   agent_foo.sinks = hdfsSink-1
   agent_foo.sinks.hdfsSink-1.type = hdfs
   agent_foo.sinks.hdfsSink-1.channels = memoryChannel-1
-  agent_foo.sinks.hdfsSink-1.hdfs.path = /flume/events/%y-%m-%d
-  agent_foo.sinks.hdfsSink-1.hdfs.filePrevix = events-
+  agent_foo.sinks.hdfsSink-1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
+  agent_foo.sinks.hdfsSink-1.hdfs.filePrefix = events-
+  agent_foo.sinks.hdfsSink-1.hdfs.round = true
+  agent_foo.sinks.hdfsSink-1.hdfs.roundValue = 10
+  agent_foo.sinks.hdfsSink-1.hdfs.roundUnit = minute
+
+The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become ``/flume/events/2012-06-12/1150/00``.
 
 
 Logger Sink
@@ -1335,6 +1368,10 @@ write-timeout         3                 
           directories and cause the other channel initialization to fail.
           It is therefore necessary that you provide explicit paths to
           all the configured channels, preferably on different disks.
+          Furthermore, as file channel will sync to disk after every commit,
+          coupling it with a sink/source that batches events together may
+          be necessary to provide good performance where multiple disks are
+          not available for checkpoint and data directories.
 
 Example for agent named **agent_foo**:
 
@@ -1460,37 +1497,179 @@ Example for agent named **agent_foo** an
 Flume Sink Processors
 ---------------------
 
-Default Sink Processor
-~~~~~~~~~~~~~~~~~~~~~~
+Sink groups allow users to group multiple sinks into one entity.
+Sink processors can be used to provide load balancing capabilities over all
+sinks inside the group or to achieve fail over from one sink to another in
+case of temporal failure.
 
-Accepts only a single sink.
 Required properties are in **bold**.
 
-==============  =======  ============================================
-Property Name   Default  Description
-==============  =======  ============================================
-processor.type  default  The component type name, needs to be default
-==============  =======  ============================================
+===================  ===========  =================================================================================
+Property Name        Default      Description
+===================  ===========  =================================================================================
+**processor.sinks**  --           Space separated list of sinks that are participating in the group
+**processor.type**   ``default``  The component type name, needs to be ``default``, ``failover`` or ``load_balance``
+===================  ===========  =================================================================================
 
 
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.sinkgroups = group1
+  agent_foo.sinkgroups.group1.sinks = sink1 sink2
+  agent_foo.sinkgroups.group1.processor.type = load_balance
+
+Default Sink Processor
+~~~~~~~~~~~~~~~~~~~~~~
+
+Default sink processor accepts only a single sink. User is not forced
+to create processor (sink group) for single sinks. Instead user can follow
+the source - channel - sink pattern that was explained above in this user
+guide.
+
 Failover Sink Processor
 ~~~~~~~~~~~~~~~~~~~~~~~
 
+Failover Sink Processor maintains a prioritized list of sinks, guaranteeing
+that so long as one is available events will be processed (delivered).
+
+The fail over mechanism works by relegating failed sinks to a pool where
+they are assigned a cool down period, increasing with sequential failures
+before they are retried. Once a sink successfully sends an event it is
+restored to the live pool.
+
+To configure, set a sink groups processor to ``failover`` and set
+priorities for all individual sinks. All specified priorities must
+be unique. Furthermore, upper limit to fail over time can be set
+(in milliseconds) using ``maxpenalty`` property.
+
+Required properties are in **bold**.
+
+=================================  ===========  ===================================================================================
+Property Name                      Default      Description
+=================================  ===========  ===================================================================================
+**processor.sinks**                --           Space separated list of sinks that are participating in the group
+**processor.type**                 ``default``  The component type name, needs to be ``failover``
+**processor.priority.<sinkName>**  --             <sinkName> must be one of the sink instances associated with the current sink group
+processor.maxpenalty               30000        (in millis)
+=================================  ===========  ===================================================================================
+
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.sinkgroups = group1
+  agent_foo.sinkgroups.group1.sinks = sink1 sink2
+  agent_foo.sinkgroups.group1.processor.type = failover
+  agent_foo.sinkgroups.group1.processor.priority.sink1 = 5
+  agent_foo.sinkgroups.group1.processor.priority.sink2 = 10
+  agent_foo.sinkgroups.group1.processor.maxpenalty = 10000
+
+
+Load balancing Sink Processor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Load balancing sink processor provides the ability to load-balance flow over
+multiple sinks. It maintains an indexed list of active sinks on which the
+load must be distributed. Implementation supports distributing load using
+either via ``ROUND_ROBIN`` or via ``RANDOM`` selection mechanism. The choice
+of selection mechanism defaults to ``ROUND_ROBIN`` type, but can be overridden
+via configuration. Custom selection mechanisms are supported via custom
+classes that inherits from ``LoadBalancingSelector``.
+
+When invoked, this selector picks the next sink using its configured selection
+mechanism and invokes it. In case the selected sink fails to deliver the event,
+the processor picks the next available sink via its configured selection mechanism.
+This implementation does not blacklist the failing sink and instead continues
+to optimistically attempt every available sink. If all sinks invocations
+result in failure, the selector propagates the failure to the sink runner.
+
 Required properties are in **bold**.
 
-=============================  =======  ===================================================================================
-Property Name                  Default  Description
-=============================  =======  ===================================================================================
-processor.type                 default  The component type name, needs to be ``failover``
-processor.maxpenalty           30000    (in millis)
-processor.priority.<sinkName>           <sinkName> must be one of the sink instances associated with the current sink group
-=============================  =======  ===================================================================================
+=============================  ===============  ===============================================================
+Property Name                  Default          Description
+=============================  ===============  ===============================================================
+**processor.sinks**            --               Space separated list of sinks that are participating in the group
+**processor.type**             ``default``      The component type name, needs to be ``load_balance``
+processor.selector             ``ROUND_ROBIN``  Selection mechanism. Must be either ``ROUND_ROBIN``, ``RANDOM``
+                                                or custom FQDN to class that inherits from ``LoadBalancingSelector``
+=============================  ===============  ===============================================================
+
+Example for agent named **agent_foo**:
 
+.. code-block:: properties
+
+  agent_foo.sinkgroups = group1
+  agent_foo.sinkgroups.group1.sinks = sink1 sink2
+  agent_foo.sinkgroups.group1.processor.type = load_balance
+  agent_foo.sinkgroups.group1.processor.selector = random
 
 Custom Sink Processor
 ~~~~~~~~~~~~~~~~~~~~~
 
-Custom sink processors are not implemented at this time.
+Custom sink processors are not supported at the moment.
+
+Flume Interceptors
+------------------
+
+Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors
+are classes that implement ``org.apache.flume.interceptor.Interceptor`` interface. An interceptor can
+modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports
+chaining of interceptors. This is made possible through by specifying the list of interceptor builder class names
+in the configuration. Interceptors are specified as a whitespace separated list in the source configuration.
+The order in which the interceptors are specified is the order in which they are invoked.
+The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptors
+can modify or drop events. If an interceptor needs to drop events, it just does not return that event in
+the list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptors
+are named components, here is an example of how they are created through configuration:
+
+.. code-block:: properties
+
+  agent_foo.sources = source_foo
+  agent_foo.channels = channel-1
+  agent_foo.sources.source_foo.interceptors = a b
+  agent_foo.sources.source_foo.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder
+  agent_foo.sources.source_foo.interceptors.a.preserveExisting = false
+  agent_foo.sources.source_foo.interceptors.a.hostHeader = hostname
+  agent_foo.sources.source_foo.interceptors.b.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
+
+Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves
+configurable and can be passed configuration values just like they are passed to any other configurable component.
+In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor
+are then passed along to the TimestampInterceptor.
+
+Timestamp Interceptor
+~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor
+inserts a header with key ``timestamp`` whose value is the relevant timestamp. This interceptor
+can preserve an existing timestamp if it is already present in the configuration.
+
+================  =======  ========================================================================
+Property Name     Default  Description
+================  =======  ========================================================================
+type              --       The component type name, has to be ``TIMESTAMP``
+preserveExisting  false    If the timestamp already exists, should it be preserved - true or false
+================  =======  ========================================================================
+
+Host Interceptor
+~~~~~~~~~~~~~~~~
+
+This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header
+with key ``host`` or a configured key whose value is the hostname or IP address of the host, based on configuration.
+
+================  =======  ========================================================================
+Property Name     Default  Description
+================  =======  ========================================================================
+type              --       The component type name, has to be ``HOST``
+preserveExisting  false    If the host header already exists, should it be preserved - true or false
+useIP             true     Use the IP Address if true, else use hostname.
+hostHeader        host     The header key to be used.
+================  =======  ========================================================================
+
+In the example above, the key used in the event headers is "hostname"
+
 
 Flume Properties
 ----------------
@@ -1530,7 +1709,7 @@ Log4J Appender
 
 Appends Log4j events to a flume agent's avro source. A client using this
 appender must have the flume-ng-sdk in the classpath (eg,
-flume-ng-sdk-1.2.0-incubating-SNAPSHOT.jar).
+flume-ng-sdk-1.3.0-SNAPSHOT.jar).
 Required properties are in **bold**.
 
 =============  =======  ==========================================================================
@@ -1644,4 +1823,5 @@ org.apache.flume.ChannelSelector  MULTIP
 org.apache.flume.ChannelSelector  --                  org.example.MyChannelSelector
 org.apache.flume.SinkProcessor    DEFAULT             org.apache.flume.sink.DefaultSinkProcessor
 org.apache.flume.SinkProcessor    FAILOVER            org.apache.flume.sink.FailoverSinkProcessor
+org.apache.flume.SinkProcessor    LOAD_BALANCE        org.apache.flume.sink.LoadBalancingSinkProcessor
 ================================  ==================  ====================================================================