You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@metron.apache.org by prakash r <rp...@gmail.com> on 2018/01/16 21:15:32 UTC

Metron Flux -- KafkaSpout configuration for broker url

Hello All,

How to configure the kafka broker url for Kafka Spout (for
enrichment/indexing/parsers)


In our cluster we have 2 kafka listeners (PLAINTEXT & SASL_SSL), by default
in ZK PLAINTEXT port is coming as default port as below



[zk: localhost:2181(CONNECTED) 0] get /brokers/ids/1001
{"jmx_port":-1,"timestamp":"1516102906311","endpoints":["SASL_SSL://
cbro-test-kfk1.networks.in.xxxx.com:6667","PLAINTEXT://
cbro-test-kfk1.networks.in.xxxx.com:6669"],"host":"
cbro-test-kfk1.networks.in.xxxx.com","version":3,"port":*6669*
,"rack":"/default-rack"}
cZxid = 0x144000411f3
ctime = Tue Jan 16 22:41:46 AEDT 2018


If we see the kafka spout logs, its taking the broker url with port of
6669, and connecting with protocol SASL_SSL, so the unable to make the
connection


2018-01-17 06:53:10.169 o.a.s.k.s.KafkaSpout Thread-8-kafkaSpout-executor[8
8] [INFO] Kafka Spout opened with the following configuration:
KafkaSpoutConfig{kafkaProps={key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer,
security.protocol=SASL_SSL,
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer,
enable.auto.commit=false, group.id=enrichments,
*bootstrap.servers=cbro-test-kfk2.networks.in.xxxx.com:6669
<http://cbro-test-kfk2.networks.in.xxxx.com:6669>,cbro-test-kfk1.networks.in.xxxx.com:6669
<http://cbro-test-kfk1.networks.in.xxxx.com:6669>}*,
key=org.apache.kafka.common.serialization.ByteArrayDeserializer@647c9ae3,
value=org.apache.kafka.common.serialization.ByteArrayDeserializer@298d392e,
pollTimeoutMs=200, offsetCommitPeriodMs=30000,
maxUncommittedOffsets=10000000,
firstPollOffsetStrategy=UNCOMMITTED_EARLIEST,
subscription=org.apache.storm.kafka.spout.NamedSubscription@1c447c96,
translator=org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder$SpoutRecordTranslator@78707eb5,
retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0,
timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS},
maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10,
timeUnit=SECONDS}}}


Is there anyway can we configure the url in the flux (as we want to use
SASL_SSL url)

    # Any kafka props for the producer go here.
    -   id: "kafkaWriterProps"
        className: "java.util.HashMap"
        configMethods:
          -   name: "put"
              args:
                  - "security.protocol"
                  - "${kafka.security.protocol}"



Regards,
Prakash R