You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Dino <di...@spam4.me> on 2017/01/02 11:49:27 UTC

Cannot pass broker list parameter from Scala to Kafka: Property bootstrap.servers is not valid

I have spent a lot of time trying to figure out the following problem. I need
to consume messages from the topic of remote Kafka queue using Scala and
Spark. By default the port of Kafka on remote machine is set to `7072`, not
`9092`. Also, on remote machine there are the following versions installed:

 1. Kafka 0.10.1.0  
 2. Scala 2.11

It means that I should pass the broker list (with the port `7072`) from
Scala to remote Kafka, because otherwise it will try to use the default
port.

The problem is that according to logs the parameter `bootstrap.servers`
cannot be recognized by the remote machine. I also tried to rename this
parameter to `metadata.broker.list`, `broker.list` and `listeners`, but all
the time the same error appears in logs, e.g. `Property bootstrap.servers is
not valid` and then the port `9092` is used by default (and the messages are
obviously not consumed).

Using telnet, I check that I have an access to remote Kafka from the EMR
machine on Amazon Cloud. I also checked that the name of Kafka topic is
correct (I can consume messages from terminal using `curl` and Rest API, but
just everything fails in Scala).

In POM file I use the following dependency for Kafka and Spark:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

I use Scala 2.10, not 2.11.

This is my Scala code for Kafka consumer (*it works absolutely fine if I use
my own Kafka installed in Amazon Cloud where I have EMR machines (there I
have the port `9092` used for Kafka)*):

        val testTopicMap = testTopic.split(",").map((_,
kafkaNumThreads.toInt)).toMap
    
       val kafkaParams = Map[String, String](
          "broker.list" -> "XXX.XX.XXX.XX:7072",
          "zookeeper.connect" -> "XXX.XX.XXX.XX:2181",
          "group.id" -> "test",
          "zookeeper.connection.timeout.ms" -> "10000",
          "auto.offset.reset" -> "smallest")
    
        val testEvents: DStream[String] =
          KafkaUtils
            .createStream[String, String, StringDecoder, StringDecoder](
            ssc,
            kafkaParams,
            testTopicMap,
            StorageLevel.MEMORY_AND_DISK_SER_2
          ).map(_._2)

I was reading this Documentation
(/https://kafka.apache.org/documentation/#brokerconfigs/) but it looks like
everything I did is correct. Should I use some other Kafka client API (other
Maven dependency)?

*UPDATE #1:*

I also tried Direct Stream (without Zookeeper), but it runs into the error:

    val testTopicMap = testTopic.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" ->
"XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072", 
"bootstrap.servers" ->
"XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072",
                                          "auto.offset.reset" -> "smallest")
    val testEvents = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, testTopicMap).map(_._2)
    
    testEvents.print()

    17/01/02 12:23:15 ERROR ApplicationMaster: User class threw exception:
org.apache.spark.SparkException: java.io.EOFException: Received -1 when
reading from channel, socket has likely been closed.
    java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
    java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.


*UPDATE #2:*

Some of suggested solutions was to set `the property 'advertised.host.name'
as instructed by the comments in the kafka configuration
(config/server.properties)`. Do I understand correctly that
`config/server.properties` should be changed on the remote machine where
Kafka is installed?



ANY SUGGESTION WILL BE REALLY HIGHLY APPRECIATED.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Cannot-pass-broker-list-parameter-from-Scala-to-Kafka-Property-bootstrap-servers-is-not-valid-tp20424.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org