You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Roshan Naik (JIRA)" <ji...@apache.org> on 2015/10/02 01:18:26 UTC

[jira] [Commented] (FLUME-2792) Flume Kafka Kerberos Support

    [ https://issues.apache.org/jira/browse/FLUME-2792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14940582#comment-14940582 ] 

Roshan Naik commented on FLUME-2792:
------------------------------------

Here are some notes that i gathered from talking to Kafka experts. Give it a shot...


It seems like it might be possible for Kafka Sink (i,e kafka producer side). This won't work for Kafka source (I.e Kafka consumer side)
Below are the steps we identified:

1)  *In Flume's Kafka Sink config set:*
   agentName.sinks.KafkaSinkName.kafka.security.protocol=PLAINTEXTSASL
The sink  will forward this setting to the underlying Kafka Producer APIs. This inform the Producer APIs to use kerberos

2) *Pass the following JVM args to Flume:*
        -Djava.security.auth.login.config= /path/jaas.conf 
This indicates the name of the file which has additional security settings and used by the Producer APIs.
JVM args for Flume can be set using the flume-env.sh which resides in the directory specified by the –c argument to Flume startup command. If Ambari managed, ambari also allows you to directly edit the flume-env.sh as far I recall.

3) *The jaas.conf file's contents should look like this:*
KafkaClient { 
 
 
com.sun.security.auth.module.Krb5LoginModule required 
useKeyTab=true 
keyTab="/etc/security/keytabs/flume_agent.keytab" 
storeKey=true 
useTicketCache=false 
principal="flume_agent/host_name@EXAMPLE.COM" 
 
serviceName="kafka"; 
 
};
You need to customize the key tab, principal and service name.

4) *Ensure the right Kafka libraries are used by Flume:*
  The Kerberos support is being added to version the upcoming Kafka v 0.9. Just ensure flume/lib does not have conflicting kafka jar versions.



> Flume Kafka Kerberos Support
> ----------------------------
>
>                 Key: FLUME-2792
>                 URL: https://issues.apache.org/jira/browse/FLUME-2792
>             Project: Flume
>          Issue Type: Bug
>          Components: Configuration, Docs, Sinks+Sources
>    Affects Versions: v1.6.0, v1.5.2
>         Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2 + Flume 1.5.2 or Apache Flume 1.6 downloaded from apache.org
>            Reporter: Hari Sekhon
>            Priority: Blocker
>
> Following on from FLUME-2790 it appears as though Flume doesn't yet have support for Kafka + Kerberos as there are is no setting documented in the Flume 1.6.0 user guide under the Kafka source section to tell Flume to use plaintextsasl as the connection mechanism to Kafka and Kafka rejects unauthenticated plaintext mechanism:
> {code}15/09/10 16:51:22 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1441903874830] Added fetcher for partitions ArrayBuffer()
> 15/09/10 16:51:22 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_<custom_scrubbed>-1441903874763-abdc98ec-leader-finder-thread], Failed to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> kafka.common.BrokerEndPointNotAvailableException: End point PLAINTEXT not found for broker 0
>         at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:140)
>         at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:124)
>         at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:124)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at kafka.utils.ZkUtils$.getAllBrokerEndPointsForChannel(ZkUtils.scala:124)
>         at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)