You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/14 22:35:12 UTC
flume git commit: FLUME-2668. Document
SecureThriftRpcClient/SecureRpcClientFactory in Flume Developer Guide
Repository: flume
Updated Branches:
refs/heads/trunk be4ae294c -> a508d9531
FLUME-2668. Document SecureThriftRpcClient/SecureRpcClientFactory in Flume Developer Guide
(Johny Rufus via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a508d953
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a508d953
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a508d953
Branch: refs/heads/trunk
Commit: a508d953162a2b5e771ce5c3c8ee40e02c3fe3c7
Parents: be4ae29
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Apr 14 13:34:15 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Apr 14 13:35:09 2015 -0700
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 147 ++++++++++++++++++++---
1 file changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/a508d953/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
index e3b60e6..f948778 100644
--- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
@@ -277,6 +277,116 @@ properties:
request-timeout = 20000 # Must be >=1000 (default: 20000)
+Secure RPC client - Thrift
+''''''''''''''''''''''''''
+
+As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication.
+The client needs to use the getThriftInstance method of ``SecureRpcClientFactory``
+to get hold of a ``SecureThriftRpcClient``. ``SecureThriftRpcClient`` extends
+``ThriftRpcClient`` which implements the ``RpcClient`` interface. The kerberos
+authentication module resides in flume-ng-auth module which is
+required in classpath, when using the ``SecureRpcClientFactory``. Both the client
+principal and the client keytab should be passed in as parameters through the
+properties and they reflect the credentials of the client to authenticate
+against the kerberos KDC. In addition, the server principal of the destination
+Thrift source to which this client is connecting to, should also be provided.
+The following example shows how to use the ``SecureRpcClientFactory``
+within a user's data-generating application:
+
+.. code-block:: java
+
+ import org.apache.flume.Event;
+ import org.apache.flume.EventDeliveryException;
+ import org.apache.flume.event.EventBuilder;
+ import org.apache.flume.api.SecureRpcClientFactory;
+ import org.apache.flume.api.RpcClientConfigurationConstants;
+ import org.apache.flume.api.RpcClient;
+ import java.nio.charset.Charset;
+ import java.util.Properties;
+
+ public class MyApp {
+ public static void main(String[] args) {
+ MySecureRpcClientFacade client = new MySecureRpcClientFacade();
+ // Initialize client with the remote Flume agent's host, port
+ Properties props = new Properties();
+ props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
+ props.setProperty("hosts", "h1");
+ props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));
+
+ // Initialize client with the kerberos authentication related properties
+ props.setProperty("kerberos", "true");
+ props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
+ props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
+ props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
+ client.init(props);
+
+ // Send 10 events to the remote Flume agent. That agent should be
+ // configured to listen with an AvroSource.
+ String sampleData = "Hello Flume!";
+ for (int i = 0; i < 10; i++) {
+ client.sendDataToFlume(sampleData);
+ }
+
+ client.cleanUp();
+ }
+ }
+
+ class MySecureRpcClientFacade {
+ private RpcClient client;
+ private Properties properties;
+
+ public void init(Properties properties) {
+ // Setup the RPC connection
+ this.properties = properties;
+ // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
+ this.client = SecureRpcClientFactory.getThriftInstance(properties);
+ }
+
+ public void sendDataToFlume(String data) {
+ // Create a Flume Event object that encapsulates the sample data
+ Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
+
+ // Send the event
+ try {
+ client.append(event);
+ } catch (EventDeliveryException e) {
+ // clean up and recreate the client
+ client.close();
+ client = null;
+ client = SecureRpcClientFactory.getThriftInstance(properties);
+ }
+ }
+
+ public void cleanUp() {
+ // Close the RPC connection
+ client.close();
+ }
+ }
+
+The remote ``ThriftSource`` should be started in kerberos mode.
+Below is an example Flume agent configuration that's waiting for a connection
+from MyApp:
+
+.. code-block:: properties
+
+ a1.channels = c1
+ a1.sources = r1
+ a1.sinks = k1
+
+ a1.channels.c1.type = memory
+
+ a1.sources.r1.channels = c1
+ a1.sources.r1.type = thrift
+ a1.sources.r1.bind = 0.0.0.0
+ a1.sources.r1.port = 41414
+ a1.sources.r1.kerberos = true
+ a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
+ a1.sources.r1.agent-keytab = /tmp/flume.keytab
+
+
+ a1.sinks.k1.channel = c1
+ a1.sinks.k1.type = logger
+
Failover Client
'''''''''''''''
@@ -459,20 +569,29 @@ full Agent. The following is an exhaustive list of configration options:
Required properties are in **bold**.
-==================== ================ ==============================================
-Property Name Default Description
-==================== ================ ==============================================
-source.type embedded The only available source is the embedded source.
-**channel.type** -- Either ``memory`` or ``file`` which correspond to MemoryChannel and FileChannel respectively.
-channel.* -- Configuration options for the channel type requested, see MemoryChannel or FileChannel user guide for an exhaustive list.
-**sinks** -- List of sink names
-**sink.type** -- Property name must match a name in the list of sinks. Value must be ``avro``
-sink.* -- Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port.
-**processor.type** -- Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
-processor.* -- Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list.
-source.interceptors -- Space-separated list of interceptors
-source.interceptors.* -- Configuration options for individual interceptors specified in the source.interceptors property
-==================== ================ ==============================================
+===================== ================ ======================================================================
+Property Name Default Description
+===================== ================ ======================================================================
+source.type embedded The only available source is the embedded source.
+**channel.type** -- Either ``memory`` or ``file`` which correspond
+ to MemoryChannel and FileChannel respectively.
+channel.* -- Configuration options for the channel type requested,
+ see MemoryChannel or FileChannel user guide for an exhaustive list.
+**sinks** -- List of sink names
+**sink.type** -- Property name must match a name in the list of sinks.
+ Value must be ``avro``
+sink.* -- Configuration options for the sink.
+ See AvroSink user guide for an exhaustive list,
+ however note AvroSink requires at least hostname and port.
+**processor.type** -- Either ``failover`` or ``load_balance`` which correspond
+ to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
+processor.* -- Configuration options for the sink processor selected.
+ See FailoverSinksProcessor and LoadBalancingSinkProcessor
+ user guide for an exhaustive list.
+source.interceptors -- Space-separated list of interceptors
+source.interceptors.* -- Configuration options for individual interceptors
+ specified in the source.interceptors property
+===================== ================ ======================================================================
Below is an example of how to use the agent: