You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/06/01 22:49:12 UTC
svn commit: r953484 [1/4] - in
/websites/production/flume/content/releases/content/1.6.0:
FlumeDeveloperGuide.html FlumeUserGuide.html
Author: hshreedharan
Date: Mon Jun 1 20:49:11 2015
New Revision: 953484
Log:
Add dev and user guides to 1.6.0
Added:
websites/production/flume/content/releases/content/1.6.0/FlumeDeveloperGuide.html (with props)
websites/production/flume/content/releases/content/1.6.0/FlumeUserGuide.html (with props)
Added: websites/production/flume/content/releases/content/1.6.0/FlumeDeveloperGuide.html
==============================================================================
--- websites/production/flume/content/releases/content/1.6.0/FlumeDeveloperGuide.html (added)
+++ websites/production/flume/content/releases/content/1.6.0/FlumeDeveloperGuide.html Mon Jun 1 20:49:11 2015
@@ -0,0 +1,980 @@
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+ "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+
+
+<html xmlns="http://www.w3.org/1999/xhtml">
+ <head>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+
+ <title>Flume 1.6.0 Developer Guide — Apache Flume</title>
+
+ <link rel="stylesheet" href="_static/flume.css" type="text/css" />
+ <link rel="stylesheet" href="_static/pygments.css" type="text/css" />
+
+ <script type="text/javascript">
+ var DOCUMENTATION_OPTIONS = {
+ URL_ROOT: '',
+ VERSION: '',
+ COLLAPSE_INDEX: false,
+ FILE_SUFFIX: '.html',
+ HAS_SOURCE: true
+ };
+ </script>
+ <script type="text/javascript" src="_static/jquery.js"></script>
+ <script type="text/javascript" src="_static/underscore.js"></script>
+ <script type="text/javascript" src="_static/doctools.js"></script>
+ <link rel="top" title="Apache Flume" href="index.html" />
+ <link rel="up" title="Documentation" href="documentation.html" />
+ <link rel="next" title="Releases" href="releases/index.html" />
+ <link rel="prev" title="Flume 1.6.0 User Guide" href="FlumeUserGuide.html" />
+ </head>
+ <body>
+<div class="header">
+ <table width="100%" border="0">
+ <tr>
+ <td width="10%">
+ <div class="logo">
+ <a href="index.html">
+ <img class="logo" src="_static/flume-logo.png" alt="Logo"/>
+ </div>
+ </td>
+ <td width="2%">
+ <span class="trademark">™</span>
+ </td>
+ <td width="68%" align="center" class="pageTitle">Apache Flume<sup><span class="trademark">™</span></sup>
+ </td>
+ <td width="20%">
+ <a href="http://www.apache.org">
+ <img src="_static/feather-small.png" alt="Apache Software Foundation" height="70"/>
+ </a>
+ </td>
+ </tr>
+ </table>
+</div>
+
+
+ <div class="document">
+ <div class="documentwrapper">
+ <div class="bodywrapper">
+ <div class="body">
+
+ <div class="section" id="flume-1-6-0-developer-guide">
+<h1>Flume 1.6.0 Developer Guide<a class="headerlink" href="#flume-1-6-0-developer-guide" title="Permalink to this headline">¶</a></h1>
+<div class="section" id="introduction">
+<h2>Introduction<a class="headerlink" href="#introduction" title="Permalink to this headline">¶</a></h2>
+<div class="section" id="overview">
+<h3>Overview<a class="headerlink" href="#overview" title="Permalink to this headline">¶</a></h3>
+<p>Apache Flume is a distributed, reliable, and available system for
+efficiently collecting, aggregating and moving large amounts of log
+data from many different sources to a centralized data store.</p>
+<p>Apache Flume is a top-level project at the Apache Software Foundation.
+There are currently two release code lines available, versions 0.9.x and 1.x.
+This documentation applies to the 1.x codeline.
+For the 0.9.x codeline, please see the <a class="reference external" href="http://archive.cloudera.com/cdh/3/flume/DeveloperGuide/">Flume 0.9.x Developer Guide</a>.</p>
+</div>
+<div class="section" id="architecture">
+<h3>Architecture<a class="headerlink" href="#architecture" title="Permalink to this headline">¶</a></h3>
+<div class="section" id="data-flow-model">
+<h4>Data flow model<a class="headerlink" href="#data-flow-model" title="Permalink to this headline">¶</a></h4>
+<p>An <tt class="docutils literal"><span class="pre">Event</span></tt> is a unit of data that flows through a Flume agent. The <tt class="docutils literal"><span class="pre">Event</span></tt>
+flows from <tt class="docutils literal"><span class="pre">Source</span></tt> to <tt class="docutils literal"><span class="pre">Channel</span></tt> to <tt class="docutils literal"><span class="pre">Sink</span></tt>, and is represented by an
+implementation of the <tt class="docutils literal"><span class="pre">Event</span></tt> interface. An <tt class="docutils literal"><span class="pre">Event</span></tt> carries a payload (byte
+array) that is accompanied by an optional set of headers (string attributes).
+A Flume agent is a process (JVM) that hosts the components that allow
+<tt class="docutils literal"><span class="pre">Event</span></tt>s to flow from an external source to a external destination.</p>
+<div class="figure align-center">
+<img alt="Agent component diagram" src="_images/DevGuide_image00.png" />
+</div>
+<p>A <tt class="docutils literal"><span class="pre">Source</span></tt> consumes <tt class="docutils literal"><span class="pre">Event</span></tt>s having a specific format, and those
+<tt class="docutils literal"><span class="pre">Event</span></tt>s are delivered to the <tt class="docutils literal"><span class="pre">Source</span></tt> by an external source like a web
+server. For example, an <tt class="docutils literal"><span class="pre">AvroSource</span></tt> can be used to receive Avro <tt class="docutils literal"><span class="pre">Event</span></tt>s
+from clients or from other Flume agents in the flow. When a <tt class="docutils literal"><span class="pre">Source</span></tt> receives
+an <tt class="docutils literal"><span class="pre">Event</span></tt>, it stores it into one or more <tt class="docutils literal"><span class="pre">Channel</span></tt>s. The <tt class="docutils literal"><span class="pre">Channel</span></tt> is
+a passive store that holds the <tt class="docutils literal"><span class="pre">Event</span></tt> until that <tt class="docutils literal"><span class="pre">Event</span></tt> is consumed by a
+<tt class="docutils literal"><span class="pre">Sink</span></tt>. One type of <tt class="docutils literal"><span class="pre">Channel</span></tt> available in Flume is the <tt class="docutils literal"><span class="pre">FileChannel</span></tt>
+which uses the local filesystem as its backing store. A <tt class="docutils literal"><span class="pre">Sink</span></tt> is responsible
+for removing an <tt class="docutils literal"><span class="pre">Event</span></tt> from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and putting it into an external
+repository like HDFS (in the case of an <tt class="docutils literal"><span class="pre">HDFSEventSink</span></tt>) or forwarding it to
+the <tt class="docutils literal"><span class="pre">Source</span></tt> at the next hop of the flow. The <tt class="docutils literal"><span class="pre">Source</span></tt> and <tt class="docutils literal"><span class="pre">Sink</span></tt> within
+the given agent run asynchronously with the <tt class="docutils literal"><span class="pre">Event</span></tt>s staged in the
+<tt class="docutils literal"><span class="pre">Channel</span></tt>.</p>
+</div>
+<div class="section" id="reliability">
+<h4>Reliability<a class="headerlink" href="#reliability" title="Permalink to this headline">¶</a></h4>
+<p>An <tt class="docutils literal"><span class="pre">Event</span></tt> is staged in a Flume agent’s <tt class="docutils literal"><span class="pre">Channel</span></tt>. Then it’s the
+<tt class="docutils literal"><span class="pre">Sink</span></tt>‘s responsibility to deliver the <tt class="docutils literal"><span class="pre">Event</span></tt> to the next agent or
+terminal repository (like HDFS) in the flow. The <tt class="docutils literal"><span class="pre">Sink</span></tt> removes an <tt class="docutils literal"><span class="pre">Event</span></tt>
+from the <tt class="docutils literal"><span class="pre">Channel</span></tt> only after the <tt class="docutils literal"><span class="pre">Event</span></tt> is stored into the <tt class="docutils literal"><span class="pre">Channel</span></tt> of
+the next agent or stored in the terminal repository. This is how the single-hop
+message delivery semantics in Flume provide end-to-end reliability of the flow.
+Flume uses a transactional approach to guarantee the reliable delivery of the
+<tt class="docutils literal"><span class="pre">Event</span></tt>s. The <tt class="docutils literal"><span class="pre">Source</span></tt>s and <tt class="docutils literal"><span class="pre">Sink</span></tt>s encapsulate the
+storage/retrieval of the <tt class="docutils literal"><span class="pre">Event</span></tt>s in a <tt class="docutils literal"><span class="pre">Transaction</span></tt> provided by the
+<tt class="docutils literal"><span class="pre">Channel</span></tt>. This ensures that the set of <tt class="docutils literal"><span class="pre">Event</span></tt>s are reliably passed from
+point to point in the flow. In the case of a multi-hop flow, the <tt class="docutils literal"><span class="pre">Sink</span></tt> from
+the previous hop and the <tt class="docutils literal"><span class="pre">Source</span></tt> of the next hop both have their
+<tt class="docutils literal"><span class="pre">Transaction</span></tt>s open to ensure that the <tt class="docutils literal"><span class="pre">Event</span></tt> data is safely stored in
+the <tt class="docutils literal"><span class="pre">Channel</span></tt> of the next hop.</p>
+</div>
+</div>
+<div class="section" id="building-flume">
+<h3>Building Flume<a class="headerlink" href="#building-flume" title="Permalink to this headline">¶</a></h3>
+<div class="section" id="getting-the-source">
+<h4>Getting the source<a class="headerlink" href="#getting-the-source" title="Permalink to this headline">¶</a></h4>
+<p>Check-out the code using Git. Click here for
+<a class="reference external" href="https://git-wip-us.apache.org/repos/asf/flume.git">the git repository root</a>.</p>
+<p>The Flume 1.x development happens under the branch “trunk” so this command line
+can be used:</p>
+<blockquote>
+<div>git clone <a class="reference external" href="https://git-wip-us.apache.org/repos/asf/flume.git">https://git-wip-us.apache.org/repos/asf/flume.git</a></div></blockquote>
+</div>
+<div class="section" id="compile-test-flume">
+<h4>Compile/test Flume<a class="headerlink" href="#compile-test-flume" title="Permalink to this headline">¶</a></h4>
+<p>The Flume build is mavenized. You can compile Flume using the standard Maven
+commands:</p>
+<ol class="arabic simple">
+<li>Compile only: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">compile</span></tt></li>
+<li>Compile and run unit tests: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">test</span></tt></li>
+<li>Run individual test(s): <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">test</span> <span class="pre">-Dtest=<Test1>,<Test2>,...</span> <span class="pre">-DfailIfNoTests=false</span></tt></li>
+<li>Create tarball package: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span></tt></li>
+<li>Create tarball package (skip unit tests): <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span> <span class="pre">-DskipTests</span></tt></li>
+</ol>
+<p>Please note that Flume builds requires that the Google Protocol Buffers compiler
+be in the path. You can download and install it by following the instructions
+<a class="reference external" href="https://developers.google.com/protocol-buffers/">here</a>.</p>
+</div>
+</div>
+<div class="section" id="developing-custom-components">
+<h3>Developing custom components<a class="headerlink" href="#developing-custom-components" title="Permalink to this headline">¶</a></h3>
+<div class="section" id="client">
+<h4>Client<a class="headerlink" href="#client" title="Permalink to this headline">¶</a></h4>
+<p>The client operates at the point of origin of events and delivers them to a
+Flume agent. Clients typically operate in the process space of the application
+they are consuming data from. Flume currently supports Avro, log4j, syslog,
+and Http POST (with a JSON body) as ways to transfer data from a external
+source. Additionally, thereâs an <tt class="docutils literal"><span class="pre">ExecSource</span></tt> that can consume the output of a
+local process as input to Flume.</p>
+<p>Itâs quite possible to have a use case where these existing options are not
+sufficient. In this case you can build a custom mechanism to send data to
+Flume. There are two ways of achieving this. The first option is to create a
+custom client that communicates with one of Flumeâs existing <tt class="docutils literal"><span class="pre">Source</span></tt>s like
+<tt class="docutils literal"><span class="pre">AvroSource</span></tt> or <tt class="docutils literal"><span class="pre">SyslogTcpSource</span></tt>. Here the client should convert its data
+into messages understood by these Flume <tt class="docutils literal"><span class="pre">Source</span></tt>s. The other option is to
+write a custom Flume <tt class="docutils literal"><span class="pre">Source</span></tt> that directly talks with your existing client
+application using some IPC or RPC protocol, and then converts the client data
+into Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s to be sent downstream. Note that all events stored
+within the <tt class="docutils literal"><span class="pre">Channel</span></tt> of a Flume agent must exist as Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s.</p>
+<div class="section" id="client-sdk">
+<h5>Client SDK<a class="headerlink" href="#client-sdk" title="Permalink to this headline">¶</a></h5>
+<p>Though Flume contains a number of built-in mechanisms (i.e. <tt class="docutils literal"><span class="pre">Source</span></tt>s) to
+ingest data, often one wants the ability to communicate with Flume directly from
+a custom application. The Flume Client SDK is a library that enables
+applications to connect to Flume and send data into Flumeâs data flow over RPC.</p>
+</div>
+<div class="section" id="rpc-client-interface">
+<h5>RPC client interface<a class="headerlink" href="#rpc-client-interface" title="Permalink to this headline">¶</a></h5>
+<p>An implementation of Flume’s RpcClient interface encapsulates the RPC mechanism
+supported by Flume. The user’s application can simply call the Flume Client
+SDK’s <tt class="docutils literal"><span class="pre">append(Event)</span></tt> or <tt class="docutils literal"><span class="pre">appendBatch(List<Event>)</span></tt> to send data and not
+worry about the underlying message exchange details. The user can provide the
+required <tt class="docutils literal"><span class="pre">Event</span></tt> arg by either directly implementing the <tt class="docutils literal"><span class="pre">Event</span></tt> interface,
+by using a convenience implementation such as the SimpleEvent class, or by using
+<tt class="docutils literal"><span class="pre">EventBuilder</span></tt>‘s overloaded <tt class="docutils literal"><span class="pre">withBody()</span></tt> static helper methods.</p>
+</div>
+<div class="section" id="rpc-clients-avro-and-thrift">
+<h5>RPC clients - Avro and Thrift<a class="headerlink" href="#rpc-clients-avro-and-thrift" title="Permalink to this headline">¶</a></h5>
+<p>As of Flume 1.4.0, Avro is the default RPC protocol. The
+<tt class="docutils literal"><span class="pre">NettyAvroRpcClient</span></tt> and <tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt> implement the <tt class="docutils literal"><span class="pre">RpcClient</span></tt>
+interface. The client needs to create this object with the host and port of
+the target Flume agent, and can then use the <tt class="docutils literal"><span class="pre">RpcClient</span></tt> to send data into
+the agent. The following example shows how to use the Flume Client SDK API
+within a user’s data-generating application:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">org.apache.flume.Event</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.EventDeliveryException</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClient</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClientFactory</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.event.EventBuilder</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">java.nio.charset.Charset</span><span class="o">;</span>
+
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyApp</span> <span class="o">{</span>
+ <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">MyRpcClientFacade</span> <span class="n">client</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyRpcClientFacade</span><span class="o">();</span>
+ <span class="c1">// Initialize client with the remote Flume agent's host and port</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">init</span><span class="o">(</span><span class="s">"host.example.org"</span><span class="o">,</span> <span class="mi">41414</span><span class="o">);</span>
+
+ <span class="c1">// Send 10 events to the remote Flume agent. That agent should be</span>
+ <span class="c1">// configured to listen with an AvroSource.</span>
+ <span class="n">String</span> <span class="n">sampleData</span> <span class="o">=</span> <span class="s">"Hello Flume!"</span><span class="o">;</span>
+ <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">10</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">sendDataToFlume</span><span class="o">(</span><span class="n">sampleData</span><span class="o">);</span>
+ <span class="o">}</span>
+
+ <span class="n">client</span><span class="o">.</span><span class="na">cleanUp</span><span class="o">();</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">class</span> <span class="nc">MyRpcClientFacade</span> <span class="o">{</span>
+ <span class="kd">private</span> <span class="n">RpcClient</span> <span class="n">client</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">String</span> <span class="n">hostname</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="kt">int</span> <span class="n">port</span><span class="o">;</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">String</span> <span class="n">hostname</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// Setup the RPC connection</span>
+ <span class="k">this</span><span class="o">.</span><span class="na">hostname</span> <span class="o">=</span> <span class="n">hostname</span><span class="o">;</span>
+ <span class="k">this</span><span class="o">.</span><span class="na">port</span> <span class="o">=</span> <span class="n">port</span><span class="o">;</span>
+ <span class="k">this</span><span class="o">.</span><span class="na">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getDefaultInstance</span><span class="o">(</span><span class="n">hostname</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span>
+ <span class="c1">// Use the following method to create a thrift client (instead of the above line):</span>
+ <span class="c1">// this.client = RpcClientFactory.getThriftInstance(hostname, port);</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendDataToFlume</span><span class="o">(</span><span class="n">String</span> <span class="n">data</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// Create a Flume Event object that encapsulates the sample data</span>
+ <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="n">data</span><span class="o">,</span> <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span>
+
+ <span class="c1">// Send the event</span>
+ <span class="k">try</span> <span class="o">{</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+ <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">EventDeliveryException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// clean up and recreate the client</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="n">client</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+ <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getDefaultInstance</span><span class="o">(</span><span class="n">hostname</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span>
+ <span class="c1">// Use the following method to create a thrift client (instead of the above line):</span>
+ <span class="c1">// this.client = RpcClientFactory.getThriftInstance(hostname, port);</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">cleanUp</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Close the RPC connection</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="o">}</span>
+
+<span class="o">}</span>
+</pre></div>
+</div>
+<p>The remote Flume agent needs to have an <tt class="docutils literal"><span class="pre">AvroSource</span></tt> (or a
+<tt class="docutils literal"><span class="pre">ThriftSource</span></tt> if you are using a Thrift client) listening on some port.
+Below is an example Flume agent configuration that’s waiting for a connection
+from MyApp:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span>
+<span class="na">a1.sinks</span> <span class="o">=</span> <span class="s">k1</span>
+
+<span class="na">a1.channels.c1.type</span> <span class="o">=</span> <span class="s">memory</span>
+
+<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">avro</span>
+<span class="c"># For using a thrift source set the following instead of the above line.</span>
+<span class="c"># a1.source.r1.type = thrift</span>
+<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">0.0.0.0</span>
+<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span class="s">41414</span>
+
+<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">logger</span>
+</pre></div>
+</div>
+<p>For more flexibility, the default Flume client implementations
+(<tt class="docutils literal"><span class="pre">NettyAvroRpcClient</span></tt> and <tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt>) can be configured with these
+properties:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default (for avro) or thrift (for thrift)</span>
+
+<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 # default client accepts only 1 host</span>
+ <span class="c"># (additional hosts will be ignored)</span>
+
+<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414 # host and port must both be specified</span>
+ <span class="c"># (neither has a default)</span>
+
+<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span>
+
+<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
+
+<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
+</pre></div>
+</div>
+</div>
+<div class="section" id="secure-rpc-client-thrift">
+<h5>Secure RPC client - Thrift<a class="headerlink" href="#secure-rpc-client-thrift" title="Permalink to this headline">¶</a></h5>
+<p>As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication.
+The client needs to use the getThriftInstance method of <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt>
+to get hold of a <tt class="docutils literal"><span class="pre">SecureThriftRpcClient</span></tt>. <tt class="docutils literal"><span class="pre">SecureThriftRpcClient</span></tt> extends
+<tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt> which implements the <tt class="docutils literal"><span class="pre">RpcClient</span></tt> interface. The kerberos
+authentication module resides in flume-ng-auth module which is
+required in classpath, when using the <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt>. Both the client
+principal and the client keytab should be passed in as parameters through the
+properties and they reflect the credentials of the client to authenticate
+against the kerberos KDC. In addition, the server principal of the destination
+Thrift source to which this client is connecting to, should also be provided.
+The following example shows how to use the <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt>
+within a user’s data-generating application:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">org.apache.flume.Event</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.EventDeliveryException</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.event.EventBuilder</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.api.SecureRpcClientFactory</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClientConfigurationConstants</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClient</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">java.nio.charset.Charset</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">java.util.Properties</span><span class="o">;</span>
+
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyApp</span> <span class="o">{</span>
+ <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">MySecureRpcClientFacade</span> <span class="n">client</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MySecureRpcClientFacade</span><span class="o">();</span>
+ <span class="c1">// Initialize client with the remote Flume agent's host, port</span>
+ <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+ <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="n">RpcClientConfigurationConstants</span><span class="o">.</span><span class="na">CONFIG_CLIENT_TYPE</span><span class="o">,</span> <span class="s">"thrift"</span><span class="o">);</span>
+ <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1"</span><span class="o">);</span>
+ <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="s">"client.example.org"</span><span class="o">+</span><span class="s">":"</span><span class="o">+</span> <span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="mi">41414</span><span class="o">));</span>
+
+ <span class="c1">// Initialize client with the kerberos authentication related properties</span>
+ <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"kerberos"</span><span class="o">,</span> <span class="s">"true"</span><span class="o">);</span>
+ <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"client-principal"</span><span class="o">,</span> <span class="s">"flumeclient/client.example.org@EXAMPLE.ORG"</span><span class="o">);</span>
+ <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"client-keytab"</span><span class="o">,</span> <span class="s">"/tmp/flumeclient.keytab"</span><span class="o">);</span>
+ <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"server-principal"</span><span class="o">,</span> <span class="s">"flume/server.example.org@EXAMPLE.ORG"</span><span class="o">);</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">init</span><span class="o">(</span><span class="n">props</span><span class="o">);</span>
+
+ <span class="c1">// Send 10 events to the remote Flume agent. That agent should be</span>
+ <span class="c1">// configured to listen with an AvroSource.</span>
+ <span class="n">String</span> <span class="n">sampleData</span> <span class="o">=</span> <span class="s">"Hello Flume!"</span><span class="o">;</span>
+ <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">10</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">sendDataToFlume</span><span class="o">(</span><span class="n">sampleData</span><span class="o">);</span>
+ <span class="o">}</span>
+
+ <span class="n">client</span><span class="o">.</span><span class="na">cleanUp</span><span class="o">();</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">class</span> <span class="nc">MySecureRpcClientFacade</span> <span class="o">{</span>
+ <span class="kd">private</span> <span class="n">RpcClient</span> <span class="n">client</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">Properties</span> <span class="n">properties</span><span class="o">;</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Properties</span> <span class="n">properties</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// Setup the RPC connection</span>
+ <span class="k">this</span><span class="o">.</span><span class="na">properties</span> <span class="o">=</span> <span class="n">properties</span><span class="o">;</span>
+ <span class="c1">// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory</span>
+ <span class="k">this</span><span class="o">.</span><span class="na">client</span> <span class="o">=</span> <span class="n">SecureRpcClientFactory</span><span class="o">.</span><span class="na">getThriftInstance</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendDataToFlume</span><span class="o">(</span><span class="n">String</span> <span class="n">data</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// Create a Flume Event object that encapsulates the sample data</span>
+ <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="n">data</span><span class="o">,</span> <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span>
+
+ <span class="c1">// Send the event</span>
+ <span class="k">try</span> <span class="o">{</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+ <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">EventDeliveryException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// clean up and recreate the client</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="n">client</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+ <span class="n">client</span> <span class="o">=</span> <span class="n">SecureRpcClientFactory</span><span class="o">.</span><span class="na">getThriftInstance</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">cleanUp</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Close the RPC connection</span>
+ <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+</pre></div>
+</div>
+<p>The remote <tt class="docutils literal"><span class="pre">ThriftSource</span></tt> should be started in kerberos mode.
+Below is an example Flume agent configuration that’s waiting for a connection
+from MyApp:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span>
+<span class="na">a1.sinks</span> <span class="o">=</span> <span class="s">k1</span>
+
+<span class="na">a1.channels.c1.type</span> <span class="o">=</span> <span class="s">memory</span>
+
+<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">thrift</span>
+<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">0.0.0.0</span>
+<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span class="s">41414</span>
+<span class="na">a1.sources.r1.kerberos</span> <span class="o">=</span> <span class="s">true</span>
+<span class="na">a1.sources.r1.agent-principal</span> <span class="o">=</span> <span class="s">flume/server.example.org@EXAMPLE.ORG</span>
+<span class="na">a1.sources.r1.agent-keytab</span> <span class="o">=</span> <span class="s">/tmp/flume.keytab</span>
+
+
+<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span>
+<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">logger</span>
+</pre></div>
+</div>
+</div>
+<div class="section" id="failover-client">
+<h5>Failover Client<a class="headerlink" href="#failover-client" title="Permalink to this headline">¶</a></h5>
+<p>This class wraps the default Avro RPC client to provide failover handling
+capability to clients. This takes a whitespace-separated list of <host>:<port>
+representing the Flume agents that make-up a failover group. The Failover RPC
+Client currently does not support thrift. If thereâs a
+communication error with the currently selected host (i.e. agent) agent,
+then the failover client automatically fails-over to the next host in the list.
+For example:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="c1">// Setup properties for the failover</span>
+<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"client.type"</span><span class="o">,</span> <span class="s">"default_failover"</span><span class="o">);</span>
+
+<span class="c1">// List of hosts (space-separated list of user-chosen host aliases)</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1 h2 h3"</span><span class="o">);</span>
+
+<span class="c1">// host/port pair for each host alias</span>
+<span class="n">String</span> <span class="n">host1</span> <span class="o">=</span> <span class="s">"host1.example.org:41414"</span><span class="o">;</span>
+<span class="n">String</span> <span class="n">host2</span> <span class="o">=</span> <span class="s">"host2.example.org:41414"</span><span class="o">;</span>
+<span class="n">String</span> <span class="n">host3</span> <span class="o">=</span> <span class="s">"host3.example.org:41414"</span><span class="o">;</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="n">host1</span><span class="o">);</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h2"</span><span class="o">,</span> <span class="n">host2</span><span class="o">);</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h3"</span><span class="o">,</span> <span class="n">host3</span><span class="o">);</span>
+
+<span class="c1">// create the client with failover properties</span>
+<span class="n">RpcClient</span> <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getInstance</span><span class="o">(</span><span class="n">props</span><span class="o">);</span>
+</pre></div>
+</div>
+<p>For more flexibility, the failover Flume client implementation
+(<tt class="docutils literal"><span class="pre">FailoverRpcClient</span></tt>) can be configured with these properties:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default_failover</span>
+
+<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 h2 h3 # at least one is required, but 2 or</span>
+ <span class="c"># more makes better sense</span>
+
+<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414</span>
+
+<span class="na">hosts.h2</span> <span class="o">=</span> <span class="s">host2.example.org:41414</span>
+
+<span class="na">hosts.h3</span> <span class="o">=</span> <span class="s">host3.example.org:41414</span>
+
+<span class="na">max-attempts</span> <span class="o">=</span> <span class="s">3 # Must be >=0 (default: number of hosts</span>
+ <span class="c"># specified, 3 in this case). A '0'</span>
+ <span class="c"># value doesn't make much sense because</span>
+ <span class="c"># it will just cause an append call to</span>
+ <span class="c"># immmediately fail. A '1' value means</span>
+ <span class="c"># that the failover client will try only</span>
+ <span class="c"># once to send the Event, and if it</span>
+ <span class="c"># fails then there will be no failover</span>
+ <span class="c"># to a second client, so this value</span>
+ <span class="c"># causes the failover client to</span>
+ <span class="c"># degenerate into just a default client.</span>
+ <span class="c"># It makes sense to set this value to at</span>
+ <span class="c"># least the number of hosts that you</span>
+ <span class="c"># specified.</span>
+
+<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span>
+
+<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
+
+<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
+</pre></div>
+</div>
+</div>
+<div class="section" id="loadbalancing-rpc-client">
+<h5>LoadBalancing RPC client<a class="headerlink" href="#loadbalancing-rpc-client" title="Permalink to this headline">¶</a></h5>
+<p>The Flume Client SDK also supports an RpcClient which load-balances among
+multiple hosts. This type of client takes a whitespace-separated list of
+<host>:<port> representing the Flume agents that make-up a load-balancing group.
+This client can be configured with a load balancing strategy that either
+randomly selects one of the configured hosts, or selects a host in a round-robin
+fashion. You can also specify your own custom class that implements the
+<tt class="docutils literal"><span class="pre">LoadBalancingRpcClient$HostSelector</span></tt> interface so that a custom selection
+order is used. In that case, the FQCN of the custom class needs to be specified
+as the value of the <tt class="docutils literal"><span class="pre">host-selector</span></tt> property. The LoadBalancing RPC Client
+currently does not support thrift.</p>
+<p>If <tt class="docutils literal"><span class="pre">backoff</span></tt> is enabled then the client will temporarily blacklist
+hosts that fail, causing them to be excluded from being selected as a failover
+host until a given timeout. When the timeout elapses, if the host is still
+unresponsive then this is considered a sequential failure, and the timeout is
+increased exponentially to avoid potentially getting stuck in long waits on
+unresponsive hosts.</p>
+<p>The maximum backoff time can be configured by setting <tt class="docutils literal"><span class="pre">maxBackoff</span></tt> (in
+milliseconds). The maxBackoff default is 30 seconds (specified in the
+<tt class="docutils literal"><span class="pre">OrderSelector</span></tt> class that’s the superclass of both load balancing
+strategies). The backoff timeout will increase exponentially with each
+sequential failure up to the maximum possible backoff timeout.
+The maximum possible backoff is limited to 65536 seconds (about 18.2 hours).
+For example:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="c1">// Setup properties for the load balancing</span>
+<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"client.type"</span><span class="o">,</span> <span class="s">"default_loadbalance"</span><span class="o">);</span>
+
+<span class="c1">// List of hosts (space-separated list of user-chosen host aliases)</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1 h2 h3"</span><span class="o">);</span>
+
+<span class="c1">// host/port pair for each host alias</span>
+<span class="n">String</span> <span class="n">host1</span> <span class="o">=</span> <span class="s">"host1.example.org:41414"</span><span class="o">;</span>
+<span class="n">String</span> <span class="n">host2</span> <span class="o">=</span> <span class="s">"host2.example.org:41414"</span><span class="o">;</span>
+<span class="n">String</span> <span class="n">host3</span> <span class="o">=</span> <span class="s">"host3.example.org:41414"</span><span class="o">;</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="n">host1</span><span class="o">);</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h2"</span><span class="o">,</span> <span class="n">host2</span><span class="o">);</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h3"</span><span class="o">,</span> <span class="n">host3</span><span class="o">);</span>
+
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"host-selector"</span><span class="o">,</span> <span class="s">"random"</span><span class="o">);</span> <span class="c1">// For random host selection</span>
+<span class="c1">// props.put("host-selector", "round_robin"); // For round-robin host</span>
+<span class="c1">// // selection</span>
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"backoff"</span><span class="o">,</span> <span class="s">"true"</span><span class="o">);</span> <span class="c1">// Disabled by default.</span>
+
+<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"maxBackoff"</span><span class="o">,</span> <span class="s">"10000"</span><span class="o">);</span> <span class="c1">// Defaults 0, which effectively</span>
+ <span class="c1">// becomes 30000 ms</span>
+
+<span class="c1">// Create the client with load balancing properties</span>
+<span class="n">RpcClient</span> <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getInstance</span><span class="o">(</span><span class="n">props</span><span class="o">);</span>
+</pre></div>
+</div>
+<p>For more flexibility, the load-balancing Flume client implementation
+(<tt class="docutils literal"><span class="pre">LoadBalancingRpcClient</span></tt>) can be configured with these properties:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default_loadbalance</span>
+
+<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 h2 h3 # At least 2 hosts are required</span>
+
+<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414</span>
+
+<span class="na">hosts.h2</span> <span class="o">=</span> <span class="s">host2.example.org:41414</span>
+
+<span class="na">hosts.h3</span> <span class="o">=</span> <span class="s">host3.example.org:41414</span>
+
+<span class="na">backoff</span> <span class="o">=</span> <span class="s">false # Specifies whether the client should</span>
+ <span class="c"># back-off from (i.e. temporarily</span>
+ <span class="c"># blacklist) a failed host</span>
+ <span class="c"># (default: false).</span>
+
+<span class="na">maxBackoff</span> <span class="o">=</span> <span class="s">0 # Max timeout in millis that a will</span>
+ <span class="c"># remain inactive due to a previous</span>
+ <span class="c"># failure with that host (default: 0,</span>
+ <span class="c"># which effectively becomes 30000)</span>
+
+<span class="na">host-selector</span> <span class="o">=</span> <span class="s">round_robin # The host selection strategy used</span>
+ <span class="c"># when load-balancing among hosts</span>
+ <span class="c"># (default: round_robin).</span>
+ <span class="c"># Other values are include "random"</span>
+ <span class="c"># or the FQCN of a custom class</span>
+ <span class="c"># that implements</span>
+ <span class="c"># LoadBalancingRpcClient$HostSelector</span>
+
+<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span>
+
+<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
+
+<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span>
+</pre></div>
+</div>
+</div>
+</div>
+<div class="section" id="embedded-agent">
+<h4>Embedded agent<a class="headerlink" href="#embedded-agent" title="Permalink to this headline">¶</a></h4>
+<p>Flume has an embedded agent api which allows users to embed an agent in their
+application. This agent is meant to be lightweight and as such not all
+sources, sinks, and channels are allowed. Specifically the source used
+is a special embedded source and events should be send to the source
+via the put, putAll methods on the EmbeddedAgent object. Only File Channel
+and Memory Channel are allowed as channels while Avro Sink is the only
+supported sink. Interceptors are also supported by the embedded agent.</p>
+<p>Note: The embedded agent has a dependency on hadoop-core.jar.</p>
+<p>Configuration of an Embedded Agent is similar to configuration of a
+full Agent. The following is an exhaustive list of configration options:</p>
+<p>Required properties are in <strong>bold</strong>.</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="20%" />
+<col width="15%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>source.type</td>
+<td>embedded</td>
+<td>The only available source is the embedded source.</td>
+</tr>
+<tr class="row-odd"><td><strong>channel.type</strong></td>
+<td>–</td>
+<td>Either <tt class="docutils literal"><span class="pre">memory</span></tt> or <tt class="docutils literal"><span class="pre">file</span></tt> which correspond
+to MemoryChannel and FileChannel respectively.</td>
+</tr>
+<tr class="row-even"><td>channel.*</td>
+<td>–</td>
+<td>Configuration options for the channel type requested,
+see MemoryChannel or FileChannel user guide for an exhaustive list.</td>
+</tr>
+<tr class="row-odd"><td><strong>sinks</strong></td>
+<td>–</td>
+<td>List of sink names</td>
+</tr>
+<tr class="row-even"><td><strong>sink.type</strong></td>
+<td>–</td>
+<td>Property name must match a name in the list of sinks.
+Value must be <tt class="docutils literal"><span class="pre">avro</span></tt></td>
+</tr>
+<tr class="row-odd"><td>sink.*</td>
+<td>–</td>
+<td>Configuration options for the sink.
+See AvroSink user guide for an exhaustive list,
+however note AvroSink requires at least hostname and port.</td>
+</tr>
+<tr class="row-even"><td><strong>processor.type</strong></td>
+<td>–</td>
+<td>Either <tt class="docutils literal"><span class="pre">failover</span></tt> or <tt class="docutils literal"><span class="pre">load_balance</span></tt> which correspond
+to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.</td>
+</tr>
+<tr class="row-odd"><td>processor.*</td>
+<td>–</td>
+<td>Configuration options for the sink processor selected.
+See FailoverSinksProcessor and LoadBalancingSinkProcessor
+user guide for an exhaustive list.</td>
+</tr>
+<tr class="row-even"><td>source.interceptors</td>
+<td>–</td>
+<td>Space-separated list of interceptors</td>
+</tr>
+<tr class="row-odd"><td>source.interceptors.*</td>
+<td>–</td>
+<td>Configuration options for individual interceptors
+specified in the source.interceptors property</td>
+</tr>
+</tbody>
+</table>
+<p>Below is an example of how to use the agent:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">properties</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>();</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"channel.type"</span><span class="o">,</span> <span class="s">"memory"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"channel.capacity"</span><span class="o">,</span> <span class="s">"200"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sinks"</span><span class="o">,</span> <span class="s">"sink1 sink2"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.type"</span><span class="o">,</span> <span class="s">"avro"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.type"</span><span class="o">,</span> <span class="s">"avro"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.hostname"</span><span class="o">,</span> <span class="s">"collector1.apache.org"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.port"</span><span class="o">,</span> <span class="s">"5564"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.hostname"</span><span class="o">,</span> <span class="s">"collector2.apache.org"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.port"</span><span class="o">,</span> <span class="s">"5565"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"processor.type"</span><span class="o">,</span> <span class="s">"load_balance"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors"</span><span class="o">,</span> <span class="s">"i1"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.type"</span><span class="o">,</span> <span class="s">"static"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.key"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">);</span>
+<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.value"</span><span class="o">,</span> <span class="s">"value1"</span><span class="o">);</span>
+
+<span class="n">EmbeddedAgent</span> <span class="n">agent</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EmbeddedAgent</span><span class="o">(</span><span class="s">"myagent"</span><span class="o">);</span>
+
+<span class="n">agent</span><span class="o">.</span><span class="na">configure</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span>
+<span class="n">agent</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
+
+<span class="n">List</span><span class="o"><</span><span class="n">Event</span><span class="o">></span> <span class="n">events</span> <span class="o">=</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">();</span>
+
+<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+
+<span class="n">agent</span><span class="o">.</span><span class="na">putAll</span><span class="o">(</span><span class="n">events</span><span class="o">);</span>
+
+<span class="o">...</span>
+
+<span class="n">agent</span><span class="o">.</span><span class="na">stop</span><span class="o">();</span>
+</pre></div>
+</div>
+</div>
+<div class="section" id="transaction-interface">
+<h4>Transaction interface<a class="headerlink" href="#transaction-interface" title="Permalink to this headline">¶</a></h4>
+<p>The <tt class="docutils literal"><span class="pre">Transaction</span></tt> interface is the basis of reliability for Flume. All the
+major components (ie. <tt class="docutils literal"><span class="pre">Source</span></tt>s, <tt class="docutils literal"><span class="pre">Sink</span></tt>s and <tt class="docutils literal"><span class="pre">Channel</span></tt>s) must use a
+Flume <tt class="docutils literal"><span class="pre">Transaction</span></tt>.</p>
+<div class="figure align-center">
+<img alt="Transaction sequence diagram" src="_images/DevGuide_image01.png" />
+</div>
+<p>A <tt class="docutils literal"><span class="pre">Transaction</span></tt> is implemented within a <tt class="docutils literal"><span class="pre">Channel</span></tt> implementation. Each
+<tt class="docutils literal"><span class="pre">Source</span></tt> and <tt class="docutils literal"><span class="pre">Sink</span></tt> that is connected to <tt class="docutils literal"><span class="pre">Channel</span></tt> must obtain a
+<tt class="docutils literal"><span class="pre">Transaction</span></tt> object. The <tt class="docutils literal"><span class="pre">Source</span></tt>s actually use a <tt class="docutils literal"><span class="pre">ChannelSelector</span></tt>
+interface to encapsulate the <tt class="docutils literal"><span class="pre">Transaction</span></tt>. The operation to stage an
+<tt class="docutils literal"><span class="pre">Event</span></tt> (put it into a <tt class="docutils literal"><span class="pre">Channel</span></tt>) or extract an <tt class="docutils literal"><span class="pre">Event</span></tt> (take it out of a
+<tt class="docutils literal"><span class="pre">Channel</span></tt>) is done inside an active <tt class="docutils literal"><span class="pre">Transaction</span></tt>. For example:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="n">Channel</span> <span class="n">ch</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MemoryChannel</span><span class="o">();</span>
+<span class="n">Transaction</span> <span class="n">txn</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span>
+<span class="n">txn</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span>
+<span class="k">try</span> <span class="o">{</span>
+ <span class="c1">// This try clause includes whatever Channel operations you want to do</span>
+
+ <span class="n">Event</span> <span class="n">eventToStage</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="s">"Hello Flume!"</span><span class="o">,</span>
+ <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span>
+ <span class="n">ch</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">eventToStage</span><span class="o">);</span>
+ <span class="c1">// Event takenEvent = ch.take();</span>
+ <span class="c1">// ...</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span>
+<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span>
+
+ <span class="c1">// Log exception, handle individual exceptions as needed</span>
+
+ <span class="c1">// re-throw all Errors</span>
+ <span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span>
+ <span class="o">}</span>
+<span class="o">}</span> <span class="k">finally</span> <span class="o">{</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+<span class="o">}</span>
+</pre></div>
+</div>
+<p>Here we get hold of a <tt class="docutils literal"><span class="pre">Transaction</span></tt> from a <tt class="docutils literal"><span class="pre">Channel</span></tt>. After <tt class="docutils literal"><span class="pre">begin()</span></tt>
+returns, the <tt class="docutils literal"><span class="pre">Transaction</span></tt> is now active/open and the <tt class="docutils literal"><span class="pre">Event</span></tt> is then put
+into the <tt class="docutils literal"><span class="pre">Channel</span></tt>. If the put is successful, then the <tt class="docutils literal"><span class="pre">Transaction</span></tt> is
+committed and closed.</p>
+</div>
+<div class="section" id="sink">
+<h4>Sink<a class="headerlink" href="#sink" title="Permalink to this headline">¶</a></h4>
+<p>The purpose of a <tt class="docutils literal"><span class="pre">Sink</span></tt> to extract <tt class="docutils literal"><span class="pre">Event</span></tt>s from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and
+forward them to the next Flume Agent in the flow or store them in an external
+repository. A <tt class="docutils literal"><span class="pre">Sink</span></tt> is associated with exactly one <tt class="docutils literal"><span class="pre">Channel</span></tt>s, as
+configured in the Flume properties file. Thereâs one <tt class="docutils literal"><span class="pre">SinkRunner</span></tt> instance
+associated with every configured <tt class="docutils literal"><span class="pre">Sink</span></tt>, and when the Flume framework calls
+<tt class="docutils literal"><span class="pre">SinkRunner.start()</span></tt>, a new thread is created to drive the <tt class="docutils literal"><span class="pre">Sink</span></tt> (using
+<tt class="docutils literal"><span class="pre">SinkRunner.PollingRunner</span></tt> as the thread’s <tt class="docutils literal"><span class="pre">Runnable</span></tt>). This thread manages
+the <tt class="docutils literal"><span class="pre">Sink</span></tt>âs lifecycle. The <tt class="docutils literal"><span class="pre">Sink</span></tt> needs to implement the <tt class="docutils literal"><span class="pre">start()</span></tt> and
+<tt class="docutils literal"><span class="pre">stop()</span></tt> methods that are part of the <tt class="docutils literal"><span class="pre">LifecycleAware</span></tt> interface. The
+<tt class="docutils literal"><span class="pre">Sink.start()</span></tt> method should initialize the <tt class="docutils literal"><span class="pre">Sink</span></tt> and bring it to a state
+where it can forward the <tt class="docutils literal"><span class="pre">Event</span></tt>s to its next destination. The
+<tt class="docutils literal"><span class="pre">Sink.process()</span></tt> method should do the core processing of extracting the
+<tt class="docutils literal"><span class="pre">Event</span></tt> from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and forwarding it. The <tt class="docutils literal"><span class="pre">Sink.stop()</span></tt> method
+should do the necessary cleanup (e.g. releasing resources). The <tt class="docutils literal"><span class="pre">Sink</span></tt>
+implementation also needs to implement the <tt class="docutils literal"><span class="pre">Configurable</span></tt> interface for
+processing its own configuration settings. For example:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MySink</span> <span class="kd">extends</span> <span class="n">AbstractSink</span> <span class="kd">implements</span> <span class="n">Configurable</span> <span class="o">{</span>
+ <span class="kd">private</span> <span class="n">String</span> <span class="n">myProp</span><span class="o">;</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">String</span> <span class="n">myProp</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">"myProp"</span><span class="o">,</span> <span class="s">"defaultValue"</span><span class="o">);</span>
+
+ <span class="c1">// Process the myProp value (e.g. validation)</span>
+
+ <span class="c1">// Store myProp for later retrieval by process() method</span>
+ <span class="k">this</span><span class="o">.</span><span class="na">myProp</span> <span class="o">=</span> <span class="n">myProp</span><span class="o">;</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">start</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Initialize the connection to the external repository (e.g. HDFS) that</span>
+ <span class="c1">// this Sink will forward Events to ..</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">stop</span> <span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Disconnect from the external respository and do any</span>
+ <span class="c1">// additional cleanup (e.g. releasing resources or nulling-out</span>
+ <span class="c1">// field values) ..</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">Status</span> <span class="nf">process</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">EventDeliveryException</span> <span class="o">{</span>
+ <span class="n">Status</span> <span class="n">status</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+
+ <span class="c1">// Start transaction</span>
+ <span class="n">Channel</span> <span class="n">ch</span> <span class="o">=</span> <span class="n">getChannel</span><span class="o">();</span>
+ <span class="n">Transaction</span> <span class="n">txn</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span>
+ <span class="k">try</span> <span class="o">{</span>
+ <span class="c1">// This try clause includes whatever Channel operations you want to do</span>
+
+ <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">take</span><span class="o">();</span>
+
+ <span class="c1">// Send the Event to the external repository.</span>
+ <span class="c1">// storeSomeData(e);</span>
+
+ <span class="n">txn</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span>
+ <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">READY</span><span class="o">;</span>
+ <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span>
+
+ <span class="c1">// Log exception, handle individual exceptions as needed</span>
+
+ <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">BACKOFF</span><span class="o">;</span>
+
+ <span class="c1">// re-throw all Errors</span>
+ <span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span>
+ <span class="o">}</span>
+ <span class="o">}</span> <span class="k">finally</span> <span class="o">{</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="o">}</span>
+ <span class="k">return</span> <span class="n">status</span><span class="o">;</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+</pre></div>
+</div>
+</div>
+<div class="section" id="source">
+<h4>Source<a class="headerlink" href="#source" title="Permalink to this headline">¶</a></h4>
+<p>The purpose of a <tt class="docutils literal"><span class="pre">Source</span></tt> is to receive data from an external client and store
+it into the <tt class="docutils literal"><span class="pre">Channel</span></tt>. A <tt class="docutils literal"><span class="pre">Source</span></tt> can get an instance of its own
+<tt class="docutils literal"><span class="pre">ChannelProcessor</span></tt> to process an <tt class="docutils literal"><span class="pre">Event</span></tt>. The <tt class="docutils literal"><span class="pre">ChannelProcessor</span></tt> in turn
+can get an instance of its own <tt class="docutils literal"><span class="pre">ChannelSelector</span></tt> that’s used to get the
+<tt class="docutils literal"><span class="pre">Channel</span></tt>s associated with the <tt class="docutils literal"><span class="pre">Source</span></tt>, as configured in the Flume
+properties file. A <tt class="docutils literal"><span class="pre">Transaction</span></tt> can then be retrieved from each associated
+<tt class="docutils literal"><span class="pre">Channel</span></tt> so that the <tt class="docutils literal"><span class="pre">Source</span></tt> can place <tt class="docutils literal"><span class="pre">Event</span></tt>s into the <tt class="docutils literal"><span class="pre">Channel</span></tt>
+reliably, within a <tt class="docutils literal"><span class="pre">Transaction</span></tt>.</p>
+<p>Similar to the <tt class="docutils literal"><span class="pre">SinkRunner.PollingRunner</span></tt> <tt class="docutils literal"><span class="pre">Runnable</span></tt>, thereâs
+a <tt class="docutils literal"><span class="pre">PollingRunner</span></tt> <tt class="docutils literal"><span class="pre">Runnable</span></tt> that executes on a thread created when the
+Flume framework calls <tt class="docutils literal"><span class="pre">PollableSourceRunner.start()</span></tt>. Each configured
+<tt class="docutils literal"><span class="pre">PollableSource</span></tt> is associated with its own thread that runs a
+<tt class="docutils literal"><span class="pre">PollingRunner</span></tt>. This thread manages the <tt class="docutils literal"><span class="pre">PollableSource</span></tt>âs lifecycle,
+such as starting and stopping. A <tt class="docutils literal"><span class="pre">PollableSource</span></tt> implementation must
+implement the <tt class="docutils literal"><span class="pre">start()</span></tt> and <tt class="docutils literal"><span class="pre">stop()</span></tt> methods that are declared in the
+<tt class="docutils literal"><span class="pre">LifecycleAware</span></tt> interface. The runner of a <tt class="docutils literal"><span class="pre">PollableSource</span></tt> invokes that
+<tt class="docutils literal"><span class="pre">Source</span></tt>‘s <tt class="docutils literal"><span class="pre">process()</span></tt> method. The <tt class="docutils literal"><span class="pre">process()</span></tt> method should check for
+new data and store it into the <tt class="docutils literal"><span class="pre">Channel</span></tt> as Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s.</p>
+<p>Note that there are actually two types of <tt class="docutils literal"><span class="pre">Source</span></tt>s. The <tt class="docutils literal"><span class="pre">PollableSource</span></tt>
+was already mentioned. The other is the <tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>. The
+<tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>, unlike the <tt class="docutils literal"><span class="pre">PollableSource</span></tt>, must have its own callback
+mechanism that captures the new data and stores it into the <tt class="docutils literal"><span class="pre">Channel</span></tt>. The
+<tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>s are not each driven by their own thread like the
+<tt class="docutils literal"><span class="pre">PollableSource</span></tt>s are. Below is an example of a custom <tt class="docutils literal"><span class="pre">PollableSource</span></tt>:</p>
+<div class="highlight-java"><div class="highlight"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MySource</span> <span class="kd">extends</span> <span class="n">AbstractSource</span> <span class="kd">implements</span> <span class="n">Configurable</span><span class="o">,</span> <span class="n">PollableSource</span> <span class="o">{</span>
+ <span class="kd">private</span> <span class="n">String</span> <span class="n">myProp</span><span class="o">;</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">String</span> <span class="n">myProp</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">"myProp"</span><span class="o">,</span> <span class="s">"defaultValue"</span><span class="o">);</span>
+
+ <span class="c1">// Process the myProp value (e.g. validation, convert to another type, ...)</span>
+
+ <span class="c1">// Store myProp for later retrieval by process() method</span>
+ <span class="k">this</span><span class="o">.</span><span class="na">myProp</span> <span class="o">=</span> <span class="n">myProp</span><span class="o">;</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">start</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Initialize the connection to the external client</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">stop</span> <span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Disconnect from external client and do any additional cleanup</span>
+ <span class="c1">// (e.g. releasing resources or nulling-out field values) ..</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">Status</span> <span class="nf">process</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">EventDeliveryException</span> <span class="o">{</span>
+ <span class="n">Status</span> <span class="n">status</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+
+ <span class="c1">// Start transaction</span>
+ <span class="n">Channel</span> <span class="n">ch</span> <span class="o">=</span> <span class="n">getChannel</span><span class="o">();</span>
+ <span class="n">Transaction</span> <span class="n">txn</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span>
+ <span class="k">try</span> <span class="o">{</span>
+ <span class="c1">// This try clause includes whatever Channel operations you want to do</span>
+
+ <span class="c1">// Receive new data</span>
+ <span class="n">Event</span> <span class="n">e</span> <span class="o">=</span> <span class="n">getSomeData</span><span class="o">();</span>
+
+ <span class="c1">// Store the Event into this Source's associated Channel(s)</span>
+ <span class="n">getChannelProcessor</span><span class="o">().</span><span class="na">processEvent</span><span class="o">(</span><span class="n">e</span><span class="o">)</span>
+
+ <span class="n">txn</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span>
+ <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">READY</span><span class="o">;</span>
+ <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span>
+
+ <span class="c1">// Log exception, handle individual exceptions as needed</span>
+
+ <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">BACKOFF</span><span class="o">;</span>
+
+ <span class="c1">// re-throw all Errors</span>
+ <span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span>
+ <span class="o">}</span>
+ <span class="o">}</span> <span class="k">finally</span> <span class="o">{</span>
+ <span class="n">txn</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="o">}</span>
+ <span class="k">return</span> <span class="n">status</span><span class="o">;</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+</pre></div>
+</div>
+</div>
+<div class="section" id="channel">
+<h4>Channel<a class="headerlink" href="#channel" title="Permalink to this headline">¶</a></h4>
+<p>TBD</p>
+</div>
+</div>
+</div>
+</div>
+
+
+ </div>
+ </div>
+ </div>
+ <div class="sphinxsidebar">
+ <div class="sphinxsidebarwrapper"><h3><a href="index.html">Apache Flume</a></h3>
+<ul>
+<li class="toctree-l1"><a class="reference internal" href="getinvolved.html">How to Get Involved</a></li>
+<li class="toctree-l1"><a class="reference internal" href="download.html">Download</a></li>
+<li class="toctree-l1"><a class="reference internal" href="documentation.html">Documentation</a></li>
+<li class="toctree-l1"><a class="reference internal" href="releases/index.html">Releases</a></li>
+<li class="toctree-l1"><a class="reference internal" href="mailinglists.html">Mailing lists</a></li>
+<li class="toctree-l1"><a class="reference internal" href="team.html">Team</a></li>
+<li class="toctree-l1"><a class="reference internal" href="source.html">Source Repository</a></li>
+<li class="toctree-l1"><a class="reference internal" href="license.html">Apache License</a></li>
+</ul>
+
+<h3>Resources</h3>
+
+<ul class="this-page-menu">
+ <li><a href="https://issues.apache.org/jira/browse/FLUME">Flume Issue Tracking (Jira)</a></li>
+ <li><a href="http://cwiki.apache.org/confluence/display/FLUME">Flume Wiki</a></li>
+ <li><a href="http://cwiki.apache.org/confluence/display/FLUME/Getting+Started">Getting Started Guide</a></li>
+ <li><a href="https://builds.apache.org/job/flume-trunk/">Jenkins Continuous Integration Server</a></li>
+ <li><a href="https://analysis.apache.org/">Sonar Code Quality Reports</a</li>
+</ul>
+
+<h3>Apache</h3>
+
+<ul class="this-page-menu">
+ <li><a href="http://www.apache.org">Home</a></li>
+ <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li>
+ <li><a href="http://www.apache.org/licenses">Licenses</a> </li>
+ <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li>
+ <li><a href="http://www.apachecon.com">Conferences</a></li>
+ <li><a href="http://www.apache.org/security/">Security</a></li>
+</ul>
+
+
+<h3><a href="index.html">This Page</a></h3>
+<ul>
+<li><a class="reference internal" href="#">Flume 1.6.0 Developer Guide</a><ul>
+<li><a class="reference internal" href="#introduction">Introduction</a><ul>
+<li><a class="reference internal" href="#overview">Overview</a></li>
+<li><a class="reference internal" href="#architecture">Architecture</a><ul>
+<li><a class="reference internal" href="#data-flow-model">Data flow model</a></li>
+<li><a class="reference internal" href="#reliability">Reliability</a></li>
+</ul>
+</li>
+<li><a class="reference internal" href="#building-flume">Building Flume</a><ul>
+<li><a class="reference internal" href="#getting-the-source">Getting the source</a></li>
+<li><a class="reference internal" href="#compile-test-flume">Compile/test Flume</a></li>
+</ul>
+</li>
+<li><a class="reference internal" href="#developing-custom-components">Developing custom components</a><ul>
+<li><a class="reference internal" href="#client">Client</a><ul>
+<li><a class="reference internal" href="#client-sdk">Client SDK</a></li>
+<li><a class="reference internal" href="#rpc-client-interface">RPC client interface</a></li>
+<li><a class="reference internal" href="#rpc-clients-avro-and-thrift">RPC clients - Avro and Thrift</a></li>
+<li><a class="reference internal" href="#secure-rpc-client-thrift">Secure RPC client - Thrift</a></li>
+<li><a class="reference internal" href="#failover-client">Failover Client</a></li>
+<li><a class="reference internal" href="#loadbalancing-rpc-client">LoadBalancing RPC client</a></li>
+</ul>
+</li>
+<li><a class="reference internal" href="#embedded-agent">Embedded agent</a></li>
+<li><a class="reference internal" href="#transaction-interface">Transaction interface</a></li>
+<li><a class="reference internal" href="#sink">Sink</a></li>
+<li><a class="reference internal" href="#source">Source</a></li>
+<li><a class="reference internal" href="#channel">Channel</a></li>
+</ul>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+</ul>
+
+ </div>
+ </div>
+ <div class="clearer"></div>
+ </div>
+<div class="footer">
+ © Copyright 2009-2012 The Apache Software Foundation. Apache Flume, Flume, Apache, the Apache feather logo, and the Apache Flume project logo are trademarks of The Apache Software Foundation..
+</div>
+ </body>
+</html>
\ No newline at end of file