You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kalpesh Jadhav <ka...@citiustech.com> on 2016/03/21 15:40:19 UTC

Kafka Java Producer with kerberos

Hi Team,

I have 3 node cluster on hdp 2.3, Kafka version : kafka 0.9.0
Previously I was able to send messages to kafka topic with older version of hdp i.e. 2.2, when kafka was not kerbersied.
But due to kerberisation I am not able to send message to kafka topic after upgradation from hdp 2.2 to 2.3 .

I am able to send messages through producer.sh through command line, but for that I have to do kinit manually.
Here also if I do kinit before running sample code, then also it works.
But in production I can't do kinit manually.

Is there any property I have to add in code or in jass file, which will automatically authorized to send messages to kafka topic.

I have followed this blog:
http://henning.kropponline.de/2015/11/15/kafka-security-with-kerberos/

Below is my sample code:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

                public static void main(String[] args) {
                                String principalName = "ctadmin";
                                String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
                                HscaleSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

                                System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
                                System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");

                                System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
                                System.setProperty("sun.security.krb5.debug", "true");

                                String broker = "hscale-dev1-dn4:6667";

                                try {
                                                long events = Long.parseLong("3");
                                                Random rnd = new Random();

                                                Properties props = new Properties();
                                                System.out.println("After broker list- " + broker);

                                                props.put("metadata.broker.list", broker);
                                                props.put("serializer.class", "kafka.serializer.StringEncoder");
                                                props.put("request.required.acks", "1");
                                                props.put("security.protocol", "PLAINTEXTSASL");
                                                props.put("producer.type", "async");
                                                props.put("sasl.kerberos.service.name", "kafka");

                                                //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


                                                System.out.println("After config prop -1");

                                                ProducerConfig config = new ProducerConfig(props);

                                                System.out.println("After config prop -2 config" + config);

                                                Producer<String, String> producer = new Producer<String, String>(config);

                                                System.out.println("After config prop -3");

                                                for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                                                                Date runtime = new Date();
                                                                String ip = "192.168.2" + rnd.nextInt(255);
                                                                String msg = runtime + " www.example.com, " + ip;
                                                                KeyedMessage data = new KeyedMessage("test_march4", ip, msg);

                                                                System.out.println("After config prop -1 data" + data);

                                                                producer.send(data);
                                                }
                                                producer.close();

                                } catch (Throwable th) {
                                                th.printStackTrace();

                                }
                }
}


I have posted same question on stack overflow as well.
http://stackoverflow.com/questions/35934578/kafka-java-producer-with-kerberos

Please let me know if any one need more details.

Thanks,
------------------------------------------------
Kalpesh Jadhav
Sr. Software Engineer | Development
CitiusTech Inc.
www.citiustech.com<http://citiustech.com/>