You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/14 22:35:12 UTC

flume git commit: FLUME-2668. Document SecureThriftRpcClient/SecureRpcClientFactory in Flume Developer Guide

Repository: flume
Updated Branches:
  refs/heads/trunk be4ae294c -> a508d9531


FLUME-2668. Document SecureThriftRpcClient/SecureRpcClientFactory in Flume Developer Guide

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a508d953
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a508d953
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a508d953

Branch: refs/heads/trunk
Commit: a508d953162a2b5e771ce5c3c8ee40e02c3fe3c7
Parents: be4ae29
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Apr 14 13:34:15 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Apr 14 13:35:09 2015 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 147 ++++++++++++++++++++---
 1 file changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a508d953/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
index e3b60e6..f948778 100644
--- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
@@ -277,6 +277,116 @@ properties:
 
   request-timeout = 20000              # Must be >=1000 (default: 20000)
 
+Secure RPC client - Thrift
+''''''''''''''''''''''''''
+
+As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication.
+The client needs to use the getThriftInstance method of ``SecureRpcClientFactory``
+to get hold of a ``SecureThriftRpcClient``. ``SecureThriftRpcClient`` extends
+``ThriftRpcClient`` which implements the ``RpcClient`` interface. The kerberos
+authentication module resides in flume-ng-auth module which is
+required in classpath, when using the ``SecureRpcClientFactory``. Both the client
+principal and the client keytab should be passed in as parameters through the
+properties and they reflect the credentials of the client to authenticate
+against the kerberos KDC. In addition, the server principal of the destination
+Thrift source to which this client is connecting to, should also be provided.
+The following example shows how to use the ``SecureRpcClientFactory``
+within a user's data-generating application:
+
+.. code-block:: java
+
+  import org.apache.flume.Event;
+  import org.apache.flume.EventDeliveryException;
+  import org.apache.flume.event.EventBuilder;
+  import org.apache.flume.api.SecureRpcClientFactory;
+  import org.apache.flume.api.RpcClientConfigurationConstants;
+  import org.apache.flume.api.RpcClient;
+  import java.nio.charset.Charset;
+  import java.util.Properties;
+
+  public class MyApp {
+    public static void main(String[] args) {
+      MySecureRpcClientFacade client = new MySecureRpcClientFacade();
+      // Initialize client with the remote Flume agent's host, port
+      Properties props = new Properties();
+      props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
+      props.setProperty("hosts", "h1");
+      props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));
+
+      // Initialize client with the kerberos authentication related properties
+      props.setProperty("kerberos", "true");
+      props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
+      props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
+      props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
+      client.init(props);
+
+      // Send 10 events to the remote Flume agent. That agent should be
+      // configured to listen with an AvroSource.
+      String sampleData = "Hello Flume!";
+      for (int i = 0; i < 10; i++) {
+        client.sendDataToFlume(sampleData);
+      }
+
+      client.cleanUp();
+    }
+  }
+
+  class MySecureRpcClientFacade {
+    private RpcClient client;
+    private Properties properties;
+
+    public void init(Properties properties) {
+      // Setup the RPC connection
+      this.properties = properties;
+      // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
+      this.client = SecureRpcClientFactory.getThriftInstance(properties);
+    }
+
+    public void sendDataToFlume(String data) {
+      // Create a Flume Event object that encapsulates the sample data
+      Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
+
+      // Send the event
+      try {
+        client.append(event);
+      } catch (EventDeliveryException e) {
+        // clean up and recreate the client
+        client.close();
+        client = null;
+        client = SecureRpcClientFactory.getThriftInstance(properties);
+      }
+    }
+
+    public void cleanUp() {
+      // Close the RPC connection
+      client.close();
+    }
+  }
+
+The remote ``ThriftSource`` should be started in kerberos mode.
+Below is an example Flume agent configuration that's waiting for a connection
+from MyApp:
+
+.. code-block:: properties
+
+  a1.channels = c1
+  a1.sources = r1
+  a1.sinks = k1
+
+  a1.channels.c1.type = memory
+
+  a1.sources.r1.channels = c1
+  a1.sources.r1.type = thrift
+  a1.sources.r1.bind = 0.0.0.0
+  a1.sources.r1.port = 41414
+  a1.sources.r1.kerberos = true
+  a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
+  a1.sources.r1.agent-keytab = /tmp/flume.keytab
+
+
+  a1.sinks.k1.channel = c1
+  a1.sinks.k1.type = logger
+
 Failover Client
 '''''''''''''''
 
@@ -459,20 +569,29 @@ full Agent. The following is an exhaustive list of configration options:
 
 Required properties are in **bold**.
 
-====================  ================  ==============================================
-Property Name         Default           Description
-====================  ================  ==============================================
-source.type           embedded          The only available source is the embedded source.
-**channel.type**      --                Either ``memory`` or ``file`` which correspond to MemoryChannel and FileChannel respectively.
-channel.*             --                Configuration options for the channel type requested, see MemoryChannel or FileChannel user guide for an exhaustive list.
-**sinks**             --                List of sink names
-**sink.type**         --                Property name must match a name in the list of sinks. Value must be ``avro``
-sink.*                --                Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port.
-**processor.type**    --                Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
-processor.*           --                Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list.
-source.interceptors   --                Space-separated list of interceptors
-source.interceptors.* --                Configuration options for individual interceptors specified in the source.interceptors property
-====================  ================  ==============================================
+=====================  ================  ======================================================================
+Property Name          Default           Description
+=====================  ================  ======================================================================
+source.type            embedded          The only available source is the embedded source.
+**channel.type**       --                Either ``memory`` or ``file`` which correspond 
+		         		 to MemoryChannel and FileChannel respectively.
+channel.*              --                Configuration options for the channel type requested,
+					 see MemoryChannel or FileChannel user guide for an exhaustive list.
+**sinks**              --                List of sink names
+**sink.type**          --                Property name must match a name in the list of sinks. 
+					 Value must be ``avro``
+sink.*                 --                Configuration options for the sink. 
+					 See AvroSink user guide for an exhaustive list,
+					 however note AvroSink requires at least hostname and port.
+**processor.type**     --                Either ``failover`` or ``load_balance`` which correspond
+		            		 to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
+processor.*            --                Configuration options for the sink processor selected.
+					 See FailoverSinksProcessor and LoadBalancingSinkProcessor 
+					 user guide for an exhaustive list.
+source.interceptors    --                Space-separated list of interceptors
+source.interceptors.*  --                Configuration options for individual interceptors 
+					 specified in the source.interceptors property
+=====================  ================  ======================================================================
 
 Below is an example of how to use the agent: