You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2012/07/13 18:46:24 UTC
svn commit: r1361294 - /flume/site/trunk/content/sphinx/FlumeUserGuide.rst
Author: rgoers
Date: Fri Jul 13 16:46:24 2012
New Revision: 1361294
URL: http://svn.apache.org/viewvc?rev=1361294&view=rev
Log:
Bring the user guide up to date
Modified:
flume/site/trunk/content/sphinx/FlumeUserGuide.rst
Modified: flume/site/trunk/content/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/FlumeUserGuide.rst?rev=1361294&r1=1361293&r2=1361294&view=diff
==============================================================================
--- flume/site/trunk/content/sphinx/FlumeUserGuide.rst (original)
+++ flume/site/trunk/content/sphinx/FlumeUserGuide.rst Fri Jul 13 16:46:24 2012
@@ -14,9 +14,9 @@
limitations under the License.
-====================
-Flume 1.x User Guide
-====================
+======================================
+Flume 1.3.0-SNAPSHOT User Guide
+======================================
Introduction
============
@@ -28,10 +28,11 @@ Apache Flume is a distributed, reliable,
collecting, aggregating and moving large amounts of log data from many
different sources to a centralized data store.
-At the moment Flume is an incubating Apache project. There are currently two
-release code lines available, version 0.9.x and 1.x.x. This guide is specific
-to 1.x (more specifically 1.1.0 release). Please click here for `the Flume
-0.9.x User Guide <http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
+Apache Flume is a top level project at the Apache Software Foundation.
+There are currently two release code lines available, versions 0.9.x and 1.x.
+This documentation applies to the 1.x codeline.
+Please click here for
+`the Flume 0.9.x User Guide <http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
System Requirements
-------------------
@@ -578,15 +579,17 @@ When paired with the built-in AvroSink o
it can create tiered collection topologies.
Required properties are in **bold**.
-============= =========== ===================================================
-Property Name Default Description
-============= =========== ===================================================
-**channels** --
-**type** -- The component type name, needs to be ``avro``
-**bind** -- hostname or IP address to listen on
-**port** -- Port # to bind to
-threads -- Maximum number of worker threads to spawn
-============= =========== ===================================================
+============== =========== ===================================================
+Property Name Default Description
+============== =========== ===================================================
+**channels** --
+**type** -- The component type name, needs to be ``avro``
+**bind** -- hostname or IP address to listen on
+**port** -- Port # to bind to
+threads -- Maximum number of worker threads to spawn
+interceptors -- Space separated list of interceptors
+interceptors.*
+============== =========== ===================================================
Example for agent named **agent_foo**:
@@ -621,8 +624,11 @@ Property Name Default Descriptio
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command's stderr should be logged
+batchSize 20 The max number of lines to read and send to the channel at a time
selector.type replicating replicating or multiplexing
selector.* Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
=============== =========== ==============================================================
@@ -677,6 +683,8 @@ Property Name Default Descriptio
max-line-length 512 Max line length per event body (in bytes)
selector.type replicating replicating or multiplexing
selector.* Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
=============== =========== ===========================================
Example for agent named **agent_foo**:
@@ -697,14 +705,16 @@ A simple sequence generator that continu
that starts from 0 and increments by 1. Useful mainly for testing.
Required properties are in **bold**.
-============= =========== ========================================
-Property Name Default Description
-============= =========== ========================================
-**channels** --
-**type** -- The component type name, needs to be ``seq``
-selector.type replicating or multiplexing
-selector.* replicating Depends on the selector.type value
-============= =========== ========================================
+============== =========== ========================================
+Property Name Default Description
+============== =========== ========================================
+**channels** --
+**type** -- The component type name, needs to be ``seq``
+selector.type replicating or multiplexing
+selector.* replicating Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
+============== =========== ========================================
Example for agent named **agent_foo**:
@@ -727,17 +737,19 @@ Required properties are in **bold**.
Syslog TCP Source
'''''''''''''''''
-============= =========== ==============================================
-Property Name Default Description
-============= =========== ==============================================
-**channels** --
-**type** -- The component type name, needs to be ``syslogtcp``
-**host** -- Host name or IP address to bind to
-**port** -- Port # to bind to
-eventSize 2500
-selector.type replicating or multiplexing
-selector.* replicating Depends on the selector.type value
-============= =========== ==============================================
+============== =========== ==============================================
+Property Name Default Description
+============== =========== ==============================================
+**channels** --
+**type** -- The component type name, needs to be ``syslogtcp``
+**host** -- Host name or IP address to bind to
+**port** -- Port # to bind to
+eventSize 2500
+selector.type replicating or multiplexing
+selector.* replicating Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
+============== =========== ==============================================
For example, a syslog TCP source for agent named **agent_foo**:
@@ -754,16 +766,18 @@ For example, a syslog TCP source for age
Syslog UDP Source
'''''''''''''''''
-============= =========== ==============================================
-Property Name Default Description
-============= =========== ==============================================
-**channels** --
-**type** -- The component type name, needs to be ``syslogudp``
-**host** -- Host name or IP address to bind to
-**port** -- Port # to bind to
-selector.type replicating or multiplexing
-selector.* replicating Depends on the selector.type value
-============= =========== ==============================================
+============== =========== ==============================================
+Property Name Default Description
+============== =========== ==============================================
+**channels** --
+**type** -- The component type name, needs to be ``syslogudp``
+**host** -- Host name or IP address to bind to
+**port** -- Port # to bind to
+selector.type replicating or multiplexing
+selector.* replicating Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
+============== =========== ==============================================
For example, a syslog UDP source for agent named **agent_foo**:
@@ -803,16 +817,18 @@ Required properties are in **bold**.
Avro Legacy Source
''''''''''''''''''
-============= =========== ========================================================================================
-Property Name Default Description
-============= =========== ========================================================================================
-**channels** --
-**type** -- The component type name, needs to be ``org.apache.flume.source.avroLegacy.AvroLegacySource``
-**host** -- The hostname or IP address to bind to
-**port** -- The port # to listen on
-selector.type replicating or multiplexing
-selector.* replicating Depends on the selector.type value
-============= =========== ========================================================================================
+============== =========== ========================================================================================
+Property Name Default Description
+============== =========== ========================================================================================
+**channels** --
+**type** -- The component type name, needs to be ``org.apache.flume.source.avroLegacy.AvroLegacySource``
+**host** -- The hostname or IP address to bind to
+**port** -- The port # to listen on
+selector.type replicating or multiplexing
+selector.* replicating Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
+============== =========== ========================================================================================
Example for agent named **agent_foo**:
@@ -828,16 +844,18 @@ Example for agent named **agent_foo**:
Thrift Legacy Source
''''''''''''''''''''
-============= =========== ======================================================================================
-Property Name Default Description
-============= =========== ======================================================================================
-**channels** --
-**type** -- The component type name, needs to be ``org.apache.source.thriftLegacy.ThriftLegacySource``
-**host** -- The hostname or IP address to bind to
-**port** -- The port # to listen on
-selector.type replicating or multiplexing
-selector.* replicating Depends on the selector.type value
-============= =========== ======================================================================================
+============== =========== ======================================================================================
+Property Name Default Description
+============== =========== ======================================================================================
+**channels** --
+**type** -- The component type name, needs to be ``org.apache.source.thriftLegacy.ThriftLegacySource``
+**host** -- The hostname or IP address to bind to
+**port** -- The port # to listen on
+selector.type replicating or multiplexing
+selector.* replicating Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
+============== =========== ======================================================================================
Example for agent named **agent_foo**:
@@ -857,14 +875,16 @@ A custom source is your own implementati
source's class and its dependencies must be included in the agent's classpath
when starting the Flume agent. The type of the custom source is its FQCN.
-============= =========== ==============================================
-Property Name Default Description
-============= =========== ==============================================
-**channels** --
-**type** -- The component type name, needs to be your FQCN
-selector.type replicating or multiplexing
-selector.* replicating Depends on the selector.type value
-============= =========== ==============================================
+============== =========== ==============================================
+Property Name Default Description
+============== =========== ==============================================
+**channels** --
+**type** -- The component type name, needs to be your FQCN
+selector.type replicating or multiplexing
+selector.* replicating Depends on the selector.type value
+interceptors -- Space separated list of interceptors
+interceptors.*
+============== =========== ==============================================
Example for agent named **agent_foo**:
@@ -881,23 +901,24 @@ Flume Sinks
HDFS Sink
~~~~~~~~~
-This sink writes the event into the Hadoop Distributed File System (HDFS). It
+This sink writes events into the Hadoop Distributed File System (HDFS). It
currently supports creating text and sequence files. It supports compression in
both file types. The files can be rolled (close current file and create a new
one) periodically based on the elapsed time or size of data or number of events.
-It also bucketing/partitioning data by attributes like timestamp or machine
+It also buckets/partitions data by attributes like timestamp or machine
where the event originated. The HDFS directory path may contain formatting
escape sequences that will replaced by the HDFS sink to generate a
directory/file name to store the events. Using this sink requires hadoop to be
installed so that Flume can use the Hadoop jars to communicate with the HDFS
-cluster.
+cluster. Note that a version of Hadoop that supports the sync() call is
+required.
The following are the escape sequences supported:
========= =================================================
Alias Description
========= =================================================
-%{host} host name stored in event header
+%{host} Substitute value of event header named "host". Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale's short weekday name (Mon, Tue, ...)
%A locale's full weekday name (Monday, Tuesday, ...)
@@ -926,6 +947,10 @@ the file is closed, this extension is re
complete files in the directory.
Required properties are in **bold**.
+.. note:: For all of the time related escape sequences, a header with the key
+ "timestamp" must exist among the headers of the event. One way to add
+ this automatically is to use the TimestampInterceptor.
+
====================== ============ ======================================================================
Name Default Description
====================== ============ ======================================================================
@@ -952,6 +977,9 @@ hdfs.threadsPoolSize 10 Nu
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal -- Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab -- Kerberos keytab for accessing secure HDFS
+hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
+hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using ``hdfs.roundUnit``), less than current time.
+hdfs.roundUnit second The unit of the round down value - ``second``, ``minute`` or ``hour``.
serializer ``TEXT`` Other possible options include ``AVRO_EVENT`` or the
fully-qualified class name of an implementation of the
``EventSerializer.Builder`` interface.
@@ -966,8 +994,13 @@ Example for agent named **agent_foo**:
agent_foo.sinks = hdfsSink-1
agent_foo.sinks.hdfsSink-1.type = hdfs
agent_foo.sinks.hdfsSink-1.channels = memoryChannel-1
- agent_foo.sinks.hdfsSink-1.hdfs.path = /flume/events/%y-%m-%d
- agent_foo.sinks.hdfsSink-1.hdfs.filePrevix = events-
+ agent_foo.sinks.hdfsSink-1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
+ agent_foo.sinks.hdfsSink-1.hdfs.filePrefix = events-
+ agent_foo.sinks.hdfsSink-1.hdfs.round = true
+ agent_foo.sinks.hdfsSink-1.hdfs.roundValue = 10
+ agent_foo.sinks.hdfsSink-1.hdfs.roundUnit = minute
+
+The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become ``/flume/events/2012-06-12/1150/00``.
Logger Sink
@@ -1335,6 +1368,10 @@ write-timeout 3
directories and cause the other channel initialization to fail.
It is therefore necessary that you provide explicit paths to
all the configured channels, preferably on different disks.
+ Furthermore, as file channel will sync to disk after every commit,
+ coupling it with a sink/source that batches events together may
+ be necessary to provide good performance where multiple disks are
+ not available for checkpoint and data directories.
Example for agent named **agent_foo**:
@@ -1460,37 +1497,179 @@ Example for agent named **agent_foo** an
Flume Sink Processors
---------------------
-Default Sink Processor
-~~~~~~~~~~~~~~~~~~~~~~
+Sink groups allow users to group multiple sinks into one entity.
+Sink processors can be used to provide load balancing capabilities over all
+sinks inside the group or to achieve fail over from one sink to another in
+case of temporal failure.
-Accepts only a single sink.
Required properties are in **bold**.
-============== ======= ============================================
-Property Name Default Description
-============== ======= ============================================
-processor.type default The component type name, needs to be default
-============== ======= ============================================
+=================== =========== =================================================================================
+Property Name Default Description
+=================== =========== =================================================================================
+**processor.sinks** -- Space separated list of sinks that are participating in the group
+**processor.type** ``default`` The component type name, needs to be ``default``, ``failover`` or ``load_balance``
+=================== =========== =================================================================================
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+ agent_foo.sinkgroups = group1
+ agent_foo.sinkgroups.group1.sinks = sink1 sink2
+ agent_foo.sinkgroups.group1.processor.type = load_balance
+
+Default Sink Processor
+~~~~~~~~~~~~~~~~~~~~~~
+
+Default sink processor accepts only a single sink. User is not forced
+to create processor (sink group) for single sinks. Instead user can follow
+the source - channel - sink pattern that was explained above in this user
+guide.
+
Failover Sink Processor
~~~~~~~~~~~~~~~~~~~~~~~
+Failover Sink Processor maintains a prioritized list of sinks, guaranteeing
+that so long as one is available events will be processed (delivered).
+
+The fail over mechanism works by relegating failed sinks to a pool where
+they are assigned a cool down period, increasing with sequential failures
+before they are retried. Once a sink successfully sends an event it is
+restored to the live pool.
+
+To configure, set a sink groups processor to ``failover`` and set
+priorities for all individual sinks. All specified priorities must
+be unique. Furthermore, upper limit to fail over time can be set
+(in milliseconds) using ``maxpenalty`` property.
+
+Required properties are in **bold**.
+
+================================= =========== ===================================================================================
+Property Name Default Description
+================================= =========== ===================================================================================
+**processor.sinks** -- Space separated list of sinks that are participating in the group
+**processor.type** ``default`` The component type name, needs to be ``failover``
+**processor.priority.<sinkName>** -- <sinkName> must be one of the sink instances associated with the current sink group
+processor.maxpenalty 30000 (in millis)
+================================= =========== ===================================================================================
+
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+ agent_foo.sinkgroups = group1
+ agent_foo.sinkgroups.group1.sinks = sink1 sink2
+ agent_foo.sinkgroups.group1.processor.type = failover
+ agent_foo.sinkgroups.group1.processor.priority.sink1 = 5
+ agent_foo.sinkgroups.group1.processor.priority.sink2 = 10
+ agent_foo.sinkgroups.group1.processor.maxpenalty = 10000
+
+
+Load balancing Sink Processor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Load balancing sink processor provides the ability to load-balance flow over
+multiple sinks. It maintains an indexed list of active sinks on which the
+load must be distributed. Implementation supports distributing load using
+either via ``ROUND_ROBIN`` or via ``RANDOM`` selection mechanism. The choice
+of selection mechanism defaults to ``ROUND_ROBIN`` type, but can be overridden
+via configuration. Custom selection mechanisms are supported via custom
+classes that inherits from ``LoadBalancingSelector``.
+
+When invoked, this selector picks the next sink using its configured selection
+mechanism and invokes it. In case the selected sink fails to deliver the event,
+the processor picks the next available sink via its configured selection mechanism.
+This implementation does not blacklist the failing sink and instead continues
+to optimistically attempt every available sink. If all sinks invocations
+result in failure, the selector propagates the failure to the sink runner.
+
Required properties are in **bold**.
-============================= ======= ===================================================================================
-Property Name Default Description
-============================= ======= ===================================================================================
-processor.type default The component type name, needs to be ``failover``
-processor.maxpenalty 30000 (in millis)
-processor.priority.<sinkName> <sinkName> must be one of the sink instances associated with the current sink group
-============================= ======= ===================================================================================
+============================= =============== ===============================================================
+Property Name Default Description
+============================= =============== ===============================================================
+**processor.sinks** -- Space separated list of sinks that are participating in the group
+**processor.type** ``default`` The component type name, needs to be ``load_balance``
+processor.selector ``ROUND_ROBIN`` Selection mechanism. Must be either ``ROUND_ROBIN``, ``RANDOM``
+ or custom FQDN to class that inherits from ``LoadBalancingSelector``
+============================= =============== ===============================================================
+
+Example for agent named **agent_foo**:
+.. code-block:: properties
+
+ agent_foo.sinkgroups = group1
+ agent_foo.sinkgroups.group1.sinks = sink1 sink2
+ agent_foo.sinkgroups.group1.processor.type = load_balance
+ agent_foo.sinkgroups.group1.processor.selector = random
Custom Sink Processor
~~~~~~~~~~~~~~~~~~~~~
-Custom sink processors are not implemented at this time.
+Custom sink processors are not supported at the moment.
+
+Flume Interceptors
+------------------
+
+Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors
+are classes that implement ``org.apache.flume.interceptor.Interceptor`` interface. An interceptor can
+modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports
+chaining of interceptors. This is made possible through by specifying the list of interceptor builder class names
+in the configuration. Interceptors are specified as a whitespace separated list in the source configuration.
+The order in which the interceptors are specified is the order in which they are invoked.
+The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptors
+can modify or drop events. If an interceptor needs to drop events, it just does not return that event in
+the list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptors
+are named components, here is an example of how they are created through configuration:
+
+.. code-block:: properties
+
+ agent_foo.sources = source_foo
+ agent_foo.channels = channel-1
+ agent_foo.sources.source_foo.interceptors = a b
+ agent_foo.sources.source_foo.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder
+ agent_foo.sources.source_foo.interceptors.a.preserveExisting = false
+ agent_foo.sources.source_foo.interceptors.a.hostHeader = hostname
+ agent_foo.sources.source_foo.interceptors.b.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
+
+Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves
+configurable and can be passed configuration values just like they are passed to any other configurable component.
+In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor
+are then passed along to the TimestampInterceptor.
+
+Timestamp Interceptor
+~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor
+inserts a header with key ``timestamp`` whose value is the relevant timestamp. This interceptor
+can preserve an existing timestamp if it is already present in the configuration.
+
+================ ======= ========================================================================
+Property Name Default Description
+================ ======= ========================================================================
+type -- The component type name, has to be ``TIMESTAMP``
+preserveExisting false If the timestamp already exists, should it be preserved - true or false
+================ ======= ========================================================================
+
+Host Interceptor
+~~~~~~~~~~~~~~~~
+
+This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header
+with key ``host`` or a configured key whose value is the hostname or IP address of the host, based on configuration.
+
+================ ======= ========================================================================
+Property Name Default Description
+================ ======= ========================================================================
+type -- The component type name, has to be ``HOST``
+preserveExisting false If the host header already exists, should it be preserved - true or false
+useIP true Use the IP Address if true, else use hostname.
+hostHeader host The header key to be used.
+================ ======= ========================================================================
+
+In the example above, the key used in the event headers is "hostname"
+
Flume Properties
----------------
@@ -1530,7 +1709,7 @@ Log4J Appender
Appends Log4j events to a flume agent's avro source. A client using this
appender must have the flume-ng-sdk in the classpath (eg,
-flume-ng-sdk-1.2.0-incubating-SNAPSHOT.jar).
+flume-ng-sdk-1.3.0-SNAPSHOT.jar).
Required properties are in **bold**.
============= ======= ==========================================================================
@@ -1644,4 +1823,5 @@ org.apache.flume.ChannelSelector MULTIP
org.apache.flume.ChannelSelector -- org.example.MyChannelSelector
org.apache.flume.SinkProcessor DEFAULT org.apache.flume.sink.DefaultSinkProcessor
org.apache.flume.SinkProcessor FAILOVER org.apache.flume.sink.FailoverSinkProcessor
+org.apache.flume.SinkProcessor LOAD_BALANCE org.apache.flume.sink.LoadBalancingSinkProcessor
================================ ================== ====================================================================