You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dominik Safaric <do...@gmail.com> on 2016/06/07 09:06:09 UTC

Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

As I am trying to integrate Kafka into Spark, the following exception occurs:

org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([*<topicName>*,0])
	at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at scala.util.Either.fold(Either.scala:97)
	at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
	at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
	at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
	at org.mediasoft.spark.Driver$.main(Driver.scala:42)
	at .<init>(<console>:11)
	at .<clinit>(<console>)
	at .<init>(<console>:7)
	at .<clinit>(<console>)
	at $print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
	at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
	at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
	at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
	at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
	at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
	at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
	at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
	at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
	at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
	at
org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

As for the Spark configuration:

   val conf: SparkConf = new
SparkConf().setAppName("AppName").setMaster("local[2]")

    val confParams: Map[String, String] = Map(
      "metadata.broker.list" -> "<IP_ADDRESS>:9092",
      "auto.offset.reset" -> "largest"
    )

    val topics: Set[String] = Set("<topic_name>")

    val context: StreamingContext = new StreamingContext(conf, Seconds(1))
    val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
topics)

    kafkaStream.foreachRDD(rdd => {
      rdd.collect().foreach(println)
    })

    context.awaitTermination()
    context.start()

The Kafka topic does exist, Kafka server is up and running and I am able to
produce messages to that particular topic using the Confluent REST API. 

What might the problem actually be? 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: RESOLVED - Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Mich Talebzadeh <mi...@gmail.com>.
OK so this was Kafka issue?

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 16:55, Dominik Safaric <do...@gmail.com> wrote:

> Dear all,
>
> I managed to resolve the issue. Since I kept getting the exception
> "org.apache.spark.SparkException: java.nio.channels.ClosedChannelException”,
>
> a reasonable direction was checking the advertised.host.name key which as
> I’ve read from the docs basically sets for the broker the host.name it
> should advertise to the consumers and producers.
>
> By setting this property, I instantly started receiving Kafka log messages.
>
> Nevertheless, thank you all for your help, I appreciate it!
>
> On 07 Jun 2016, at 17:44, Dominik Safaric <do...@gmail.com>
> wrote:
>
> Dear Todd,
>
> By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic
> <topic_name> --broker-list localhost:9092 --time -1
>
> I get the following current offset for <topic_name> <topic_name>:0:1760
>
> But I guess this does not provide as much information.
>
> To answer your other question, onto how exactly do I track the offset -
> implicitly via Spark Streaming, i.e. using the default checkpoints.
>
> On 07 Jun 2016, at 15:46, Todd Nist <ts...@gmail.com> wrote:
>
> Hi Dominik,
>
> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it
> appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x
> broker, but not the other way around; sorry for the confusion there.
>
> With the direct stream, simple consumer, offsets are tracked by Spark
> Streaming within its checkpoints by default.  You can also manage them
> yourself if desired.  How are you dealing with offsets ?
>
> Can you verify the offsets on the broker:
>
> kafka-run-class.sh kafka.tools.GetOffsetShell --topic <TOPIC>
> --broker-list <BROKER-IP:PORT> --time -1
>
> -Todd
>
> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric <do...@gmail.com>
> wrote:
>
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"
>>
>> Please take a look at the SBT copy.
>>
>> I would rather think that the problem is related to the Zookeeper/Kafka
>> consumers.
>>
>> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in
>> config, running  in standalone mode
>> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>>
>> Any indication onto why the channel connection might be closed? Would it
>> be Kafka or Zookeeper related?
>>
>> On 07 Jun 2016, at 14:07, Todd Nist <ts...@gmail.com> wrote:
>>
>> What version of Spark are you using?  I do not believe that 1.6.x is
>> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2
>> and 0.9.0.x.  See this for more information:
>>
>> https://issues.apache.org/jira/browse/SPARK-12177
>>
>> -Todd
>>
>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <dominiksafaric@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> Correct, I am using the 0.9.0.1 version.
>>>
>>> As already described, the topic contains messages. Those messages are
>>> produced using the Confluence REST API.
>>>
>>> However, what I’ve observed is that the problem is not in the Spark
>>> configuration, but rather Zookeeper or Kafka related.
>>>
>>> Take a look at the exception’s stack top item:
>>>
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([<topicname>,0])
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at scala.util.Either.fold(Either.scala:97)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>>> at .<init>(<console>:11)
>>> at .<clinit>(<console>)
>>> at .<init>(<console>:7)
>>>
>>> By listing all active connections using netstat, I’ve also observed that
>>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka
>>> 9092.
>>>
>>> Furthermore, I am also able to retrieve all log messages using the
>>> console consumer.
>>>
>>> Any clue what might be going wrong?
>>>
>>> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>> Hi,
>>>
>>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you?
>>> What's the topic name?
>>>
>>> Jacek
>>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <do...@gmail.com>
>>> wrote:
>>>
>>>> As I am trying to integrate Kafka into Spark, the following exception
>>>> occurs:
>>>>
>>>> org.apache.spark.SparkException:
>>>> java.nio.channels.ClosedChannelException
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([*<topicName>*,0])
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at scala.util.Either.fold(Either.scala:97)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>>         at .<init>(<console>:11)
>>>>         at .<clinit>(<console>)
>>>>         at .<init>(<console>:7)
>>>>         at .<clinit>(<console>)
>>>>         at $print(<console>)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>>
>>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>>         at
>>>>
>>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at
>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>>
>>>> As for the Spark configuration:
>>>>
>>>>    val conf: SparkConf = new
>>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>>>
>>>>     val confParams: Map[String, String] = Map(
>>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>>       "auto.offset.reset" -> "largest"
>>>>     )
>>>>
>>>>     val topics: Set[String] = Set("<topic_name>")
>>>>
>>>>     val context: StreamingContext = new StreamingContext(conf,
>>>> Seconds(1))
>>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>>> topics)
>>>>
>>>>     kafkaStream.foreachRDD(rdd => {
>>>>       rdd.collect().foreach(println)
>>>>     })
>>>>
>>>>     context.awaitTermination()
>>>>     context.start()
>>>>
>>>> The Kafka topic does exist, Kafka server is up and running and I am
>>>> able to
>>>> produce messages to that particular topic using the Confluent REST API.
>>>>
>>>> What might the problem actually be?
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>>> <http://nabble.com/>.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>>
>
>
>

RESOLVED - Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Dominik Safaric <do...@gmail.com>.
Dear all,

I managed to resolve the issue. Since I kept getting the exception "org.apache.spark.SparkException: java.nio.channels.ClosedChannelException”,

a reasonable direction was checking the advertised.host.name key which as I’ve read from the docs basically sets for the broker the host.name it should advertise to the consumers and producers.

By setting this property, I instantly started receiving Kafka log messages.

Nevertheless, thank you all for your help, I appreciate it! 

> On 07 Jun 2016, at 17:44, Dominik Safaric <do...@gmail.com> wrote:
> 
> Dear Todd,
> 
> By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic <topic_name> --broker-list localhost:9092 --time -1
> 
> I get the following current offset for <topic_name> <topic_name>:0:1760
> 
> But I guess this does not provide as much information. 
> 
> To answer your other question, onto how exactly do I track the offset - implicitly via Spark Streaming, i.e. using the default checkpoints. 
> 
>> On 07 Jun 2016, at 15:46, Todd Nist <tsindotg@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Dominik,
>> 
>> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x broker, but not the other way around; sorry for the confusion there.
>> 
>> With the direct stream, simple consumer, offsets are tracked by Spark Streaming within its checkpoints by default.  You can also manage them yourself if desired.  How are you dealing with offsets ?
>> 
>> Can you verify the offsets on the broker:
>> 
>> kafka-run-class.sh kafka.tools.GetOffsetShell --topic <TOPIC> --broker-list <BROKER-IP:PORT> --time -1
>> 
>> -Todd
>> 
>> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"
>> Please take a look at the SBT copy. 
>> 
>> I would rather think that the problem is related to the Zookeeper/Kafka consumers. 
>> 
>> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>> 
>> Any indication onto why the channel connection might be closed? Would it be Kafka or Zookeeper related? 
>> 
>>> On 07 Jun 2016, at 14:07, Todd Nist <tsindotg@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> What version of Spark are you using?  I do not believe that 1.6.x is compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 and 0.9.0.x.  See this for more information:
>>> 
>>> https://issues.apache.org/jira/browse/SPARK-12177 <https://issues.apache.org/jira/browse/SPARK-12177>
>>> 
>>> -Todd
>>> 
>>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>>> Hi,
>>> 
>>> Correct, I am using the 0.9.0.1 version. 
>>> 
>>> As already described, the topic contains messages. Those messages are produced using the Confluence REST API.
>>> 
>>> However, what I’ve observed is that the problem is not in the Spark configuration, but rather Zookeeper or Kafka related. 
>>> 
>>> Take a look at the exception’s stack top item:
>>> 
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for Set([<topicname>,0])
>>> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> 	at scala.util.Either.fold(Either.scala:97)
>>> 	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>> 	at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>> 	at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>>> 	at .<init>(<console>:11)
>>> 	at .<clinit>(<console>)
>>> 	at .<init>(<console>:7)
>>> 
>>> By listing all active connections using netstat, I’ve also observed that both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 
>>> 
>>> Furthermore, I am also able to retrieve all log messages using the console consumer.
>>> 
>>> Any clue what might be going wrong?
>>> 
>>>> On 07 Jun 2016, at 13:13, Jacek Laskowski <jacek@japila.pl <ma...@japila.pl>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's the topic name?
>>>> 
>>>> Jacek
>>>> 
>>>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>>>> As I am trying to integrate Kafka into Spark, the following exception occurs:
>>>> 
>>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([*<topicName>*,0])
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at scala.util.Either.fold(Either.scala:97)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>>         at .<init>(<console>:11)
>>>>         at .<clinit>(<console>)
>>>>         at .<init>(<console>:7)
>>>>         at .<clinit>(<console>)
>>>>         at $print(<console>)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>>         at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>>         at
>>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>> 
>>>> As for the Spark configuration:
>>>> 
>>>>    val conf: SparkConf = new
>>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>>> 
>>>>     val confParams: Map[String, String] = Map(
>>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>>       "auto.offset.reset" -> "largest"
>>>>     )
>>>> 
>>>>     val topics: Set[String] = Set("<topic_name>")
>>>> 
>>>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>>> topics)
>>>> 
>>>>     kafkaStream.foreachRDD(rdd => {
>>>>       rdd.collect().foreach(println)
>>>>     })
>>>> 
>>>>     context.awaitTermination()
>>>>     context.start()
>>>> 
>>>> The Kafka topic does exist, Kafka server is up and running and I am able to
>>>> produce messages to that particular topic using the Confluent REST API.
>>>> 
>>>> What might the problem actually be?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com <http://nabble.com/>.
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
>>>> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
>>>> 
>>> 
>>> 
>> 
>> 
> 


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Dominik Safaric <do...@gmail.com>.
Dear Todd,

By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic <topic_name> --broker-list localhost:9092 --time -1

I get the following current offset for <topic_name> <topic_name>:0:1760

But I guess this does not provide as much information. 

To answer your other question, onto how exactly do I track the offset - implicitly via Spark Streaming, i.e. using the default checkpoints. 

> On 07 Jun 2016, at 15:46, Todd Nist <ts...@gmail.com> wrote:
> 
> Hi Dominik,
> 
> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x broker, but not the other way around; sorry for the confusion there.
> 
> With the direct stream, simple consumer, offsets are tracked by Spark Streaming within its checkpoints by default.  You can also manage them yourself if desired.  How are you dealing with offsets ?
> 
> Can you verify the offsets on the broker:
> 
> kafka-run-class.sh kafka.tools.GetOffsetShell --topic <TOPIC> --broker-list <BROKER-IP:PORT> --time -1
> 
> -Todd
> 
> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"
> Please take a look at the SBT copy. 
> 
> I would rather think that the problem is related to the Zookeeper/Kafka consumers. 
> 
> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
> 
> Any indication onto why the channel connection might be closed? Would it be Kafka or Zookeeper related? 
> 
>> On 07 Jun 2016, at 14:07, Todd Nist <tsindotg@gmail.com <ma...@gmail.com>> wrote:
>> 
>> What version of Spark are you using?  I do not believe that 1.6.x is compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 and 0.9.0.x.  See this for more information:
>> 
>> https://issues.apache.org/jira/browse/SPARK-12177 <https://issues.apache.org/jira/browse/SPARK-12177>
>> 
>> -Todd
>> 
>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>> Hi,
>> 
>> Correct, I am using the 0.9.0.1 version. 
>> 
>> As already described, the topic contains messages. Those messages are produced using the Confluence REST API.
>> 
>> However, what I’ve observed is that the problem is not in the Spark configuration, but rather Zookeeper or Kafka related. 
>> 
>> Take a look at the exception’s stack top item:
>> 
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for Set([<topicname>,0])
>> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> 	at scala.util.Either.fold(Either.scala:97)
>> 	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> 	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> 	at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> 	at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>> 	at .<init>(<console>:11)
>> 	at .<clinit>(<console>)
>> 	at .<init>(<console>:7)
>> 
>> By listing all active connections using netstat, I’ve also observed that both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 
>> 
>> Furthermore, I am also able to retrieve all log messages using the console consumer.
>> 
>> Any clue what might be going wrong?
>> 
>>> On 07 Jun 2016, at 13:13, Jacek Laskowski <jacek@japila.pl <ma...@japila.pl>> wrote:
>>> 
>>> Hi,
>>> 
>>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's the topic name?
>>> 
>>> Jacek
>>> 
>>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>>> As I am trying to integrate Kafka into Spark, the following exception occurs:
>>> 
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([*<topicName>*,0])
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at scala.util.Either.fold(Either.scala:97)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>         at .<init>(<console>:11)
>>>         at .<clinit>(<console>)
>>>         at .<init>(<console>:7)
>>>         at .<clinit>(<console>)
>>>         at $print(<console>)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>         at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>         at
>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>> 
>>> As for the Spark configuration:
>>> 
>>>    val conf: SparkConf = new
>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>> 
>>>     val confParams: Map[String, String] = Map(
>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>       "auto.offset.reset" -> "largest"
>>>     )
>>> 
>>>     val topics: Set[String] = Set("<topic_name>")
>>> 
>>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>> topics)
>>> 
>>>     kafkaStream.foreachRDD(rdd => {
>>>       rdd.collect().foreach(println)
>>>     })
>>> 
>>>     context.awaitTermination()
>>>     context.start()
>>> 
>>> The Kafka topic does exist, Kafka server is up and running and I am able to
>>> produce messages to that particular topic using the Confluent REST API.
>>> 
>>> What might the problem actually be?
>>> 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com <http://nabble.com/>.
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
>>> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
>>> 
>> 
>> 
> 
> 


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Todd Nist <ts...@gmail.com>.
Hi Dominik,

Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it
appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x
broker, but not the other way around; sorry for the confusion there.

With the direct stream, simple consumer, offsets are tracked by Spark
Streaming within its checkpoints by default.  You can also manage them
yourself if desired.  How are you dealing with offsets ?

Can you verify the offsets on the broker:

kafka-run-class.sh kafka.tools.GetOffsetShell --topic <TOPIC> --broker-list
<BROKER-IP:PORT> --time -1

-Todd

On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric <do...@gmail.com>
wrote:

> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"
>
> Please take a look at the SBT copy.
>
> I would rather think that the problem is related to the Zookeeper/Kafka
> consumers.
>
> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in
> config, running  in standalone mode
> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>
> Any indication onto why the channel connection might be closed? Would it
> be Kafka or Zookeeper related?
>
> On 07 Jun 2016, at 14:07, Todd Nist <ts...@gmail.com> wrote:
>
> What version of Spark are you using?  I do not believe that 1.6.x is
> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2
> and 0.9.0.x.  See this for more information:
>
> https://issues.apache.org/jira/browse/SPARK-12177
>
> -Todd
>
> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <do...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Correct, I am using the 0.9.0.1 version.
>>
>> As already described, the topic contains messages. Those messages are
>> produced using the Confluence REST API.
>>
>> However, what I’ve observed is that the problem is not in the Spark
>> configuration, but rather Zookeeper or Kafka related.
>>
>> Take a look at the exception’s stack top item:
>>
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([<topicname>,0])
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>> at .<init>(<console>:11)
>> at .<clinit>(<console>)
>> at .<init>(<console>:7)
>>
>> By listing all active connections using netstat, I’ve also observed that
>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka
>> 9092.
>>
>> Furthermore, I am also able to retrieve all log messages using the
>> console consumer.
>>
>> Any clue what might be going wrong?
>>
>> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi,
>>
>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you?
>> What's the topic name?
>>
>> Jacek
>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <do...@gmail.com>
>> wrote:
>>
>>> As I am trying to integrate Kafka into Spark, the following exception
>>> occurs:
>>>
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([*<topicName>*,0])
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at scala.util.Either.fold(Either.scala:97)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>         at .<init>(<console>:11)
>>>         at .<clinit>(<console>)
>>>         at .<init>(<console>:7)
>>>         at .<clinit>(<console>)
>>>         at $print(<console>)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at
>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>         at
>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>         at
>>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>         at
>>>
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>         at
>>>
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>>
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>>
>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>         at
>>>
>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at
>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>
>>> As for the Spark configuration:
>>>
>>>    val conf: SparkConf = new
>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>>
>>>     val confParams: Map[String, String] = Map(
>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>       "auto.offset.reset" -> "largest"
>>>     )
>>>
>>>     val topics: Set[String] = Set("<topic_name>")
>>>
>>>     val context: StreamingContext = new StreamingContext(conf,
>>> Seconds(1))
>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>> topics)
>>>
>>>     kafkaStream.foreachRDD(rdd => {
>>>       rdd.collect().foreach(println)
>>>     })
>>>
>>>     context.awaitTermination()
>>>     context.start()
>>>
>>> The Kafka topic does exist, Kafka server is up and running and I am able
>>> to
>>> produce messages to that particular topic using the Confluent REST API.
>>>
>>> What might the problem actually be?
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> <http://nabble.com/>.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>
>

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Dominik Safaric <do...@gmail.com>.
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"
Please take a look at the SBT copy. 

I would rather think that the problem is related to the Zookeeper/Kafka consumers. 

[2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)

Any indication onto why the channel connection might be closed? Would it be Kafka or Zookeeper related? 

> On 07 Jun 2016, at 14:07, Todd Nist <ts...@gmail.com> wrote:
> 
> What version of Spark are you using?  I do not believe that 1.6.x is compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 and 0.9.0.x.  See this for more information:
> 
> https://issues.apache.org/jira/browse/SPARK-12177 <https://issues.apache.org/jira/browse/SPARK-12177>
> 
> -Todd
> 
> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> Correct, I am using the 0.9.0.1 version. 
> 
> As already described, the topic contains messages. Those messages are produced using the Confluence REST API.
> 
> However, what I’ve observed is that the problem is not in the Spark configuration, but rather Zookeeper or Kafka related. 
> 
> Take a look at the exception’s stack top item:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for Set([<topicname>,0])
> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> 	at scala.util.Either.fold(Either.scala:97)
> 	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> 	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> 	at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> 	at org.mediasoft.spark.Driver$.main(Driver.scala:22)
> 	at .<init>(<console>:11)
> 	at .<clinit>(<console>)
> 	at .<init>(<console>:7)
> 
> By listing all active connections using netstat, I’ve also observed that both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 
> 
> Furthermore, I am also able to retrieve all log messages using the console consumer.
> 
> Any clue what might be going wrong?
> 
>> On 07 Jun 2016, at 13:13, Jacek Laskowski <jacek@japila.pl <ma...@japila.pl>> wrote:
>> 
>> Hi,
>> 
>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's the topic name?
>> 
>> Jacek
>> 
>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>> As I am trying to integrate Kafka into Spark, the following exception occurs:
>> 
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([*<topicName>*,0])
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at scala.util.Either.fold(Either.scala:97)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>         at .<init>(<console>:11)
>>         at .<clinit>(<console>)
>>         at .<init>(<console>:7)
>>         at .<clinit>(<console>)
>>         at $print(<console>)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>         at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>         at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>         at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>         at
>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> 
>> As for the Spark configuration:
>> 
>>    val conf: SparkConf = new
>> SparkConf().setAppName("AppName").setMaster("local[2]")
>> 
>>     val confParams: Map[String, String] = Map(
>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>       "auto.offset.reset" -> "largest"
>>     )
>> 
>>     val topics: Set[String] = Set("<topic_name>")
>> 
>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>> topics)
>> 
>>     kafkaStream.foreachRDD(rdd => {
>>       rdd.collect().foreach(println)
>>     })
>> 
>>     context.awaitTermination()
>>     context.start()
>> 
>> The Kafka topic does exist, Kafka server is up and running and I am able to
>> produce messages to that particular topic using the Confluent REST API.
>> 
>> What might the problem actually be?
>> 
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com <http://nabble.com/>.
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
>> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
>> 
> 
> 


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Todd Nist <ts...@gmail.com>.
What version of Spark are you using?  I do not believe that 1.6.x is
compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2
and 0.9.0.x.  See this for more information:

https://issues.apache.org/jira/browse/SPARK-12177

-Todd

On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <do...@gmail.com>
wrote:

> Hi,
>
> Correct, I am using the 0.9.0.1 version.
>
> As already described, the topic contains messages. Those messages are
> produced using the Confluence REST API.
>
> However, what I’ve observed is that the problem is not in the Spark
> configuration, but rather Zookeeper or Kafka related.
>
> Take a look at the exception’s stack top item:
>
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([<topicname>,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
> at .<init>(<console>:11)
> at .<clinit>(<console>)
> at .<init>(<console>:7)
>
> By listing all active connections using netstat, I’ve also observed that
> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka
> 9092.
>
> Furthermore, I am also able to retrieve all log messages using the console
> consumer.
>
> Any clue what might be going wrong?
>
> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl> wrote:
>
> Hi,
>
> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's
> the topic name?
>
> Jacek
> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <do...@gmail.com>
> wrote:
>
>> As I am trying to integrate Kafka into Spark, the following exception
>> occurs:
>>
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([*<topicName>*,0])
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at scala.util.Either.fold(Either.scala:97)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>         at .<init>(<console>:11)
>>         at .<clinit>(<console>)
>>         at .<init>(<console>:7)
>>         at .<clinit>(<console>)
>>         at $print(<console>)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at
>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>         at
>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>         at
>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>         at
>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>         at
>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>         at
>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>         at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>         at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>>
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>         at
>>
>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>
>> As for the Spark configuration:
>>
>>    val conf: SparkConf = new
>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>
>>     val confParams: Map[String, String] = Map(
>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>       "auto.offset.reset" -> "largest"
>>     )
>>
>>     val topics: Set[String] = Set("<topic_name>")
>>
>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>> topics)
>>
>>     kafkaStream.foreachRDD(rdd => {
>>       rdd.collect().foreach(println)
>>     })
>>
>>     context.awaitTermination()
>>     context.start()
>>
>> The Kafka topic does exist, Kafka server is up and running and I am able
>> to
>> produce messages to that particular topic using the Confluent REST API.
>>
>> What might the problem actually be?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com>.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Dominik Safaric <do...@gmail.com>.
Hi,

Correct, I am using the 0.9.0.1 version. 

As already described, the topic contains messages. Those messages are produced using the Confluence REST API.

However, what I’ve observed is that the problem is not in the Spark configuration, but rather Zookeeper or Kafka related. 

Take a look at the exception’s stack top item:

org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([<topicname>,0])
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at scala.util.Either.fold(Either.scala:97)
	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
	at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
	at org.mediasoft.spark.Driver$.main(Driver.scala:22)
	at .<init>(<console>:11)
	at .<clinit>(<console>)
	at .<init>(<console>:7)

By listing all active connections using netstat, I’ve also observed that both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 

Furthermore, I am also able to retrieve all log messages using the console consumer.

Any clue what might be going wrong?

> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl> wrote:
> 
> Hi,
> 
> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's the topic name?
> 
> Jacek
> 
> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> As I am trying to integrate Kafka into Spark, the following exception occurs:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([*<topicName>*,0])
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>         at .<init>(<console>:11)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>         at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>         at
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> 
> As for the Spark configuration:
> 
>    val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
> 
>     val confParams: Map[String, String] = Map(
>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>       "auto.offset.reset" -> "largest"
>     )
> 
>     val topics: Set[String] = Set("<topic_name>")
> 
>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
> 
>     kafkaStream.foreachRDD(rdd => {
>       rdd.collect().foreach(println)
>     })
> 
>     context.awaitTermination()
>     context.start()
> 
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
> 
> What might the problem actually be?
> 
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
> 


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's
the topic name?

Jacek
On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <do...@gmail.com>
wrote:

> As I am trying to integrate Kafka into Spark, the following exception
> occurs:
>
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([*<topicName>*,0])
>         at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
>
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>         at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>         at .<init>(<console>:11)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>         at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>         at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>         at
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>         at
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>         at
>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
>
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>         at
>
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
> As for the Spark configuration:
>
>    val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
>
>     val confParams: Map[String, String] = Map(
>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>       "auto.offset.reset" -> "largest"
>     )
>
>     val topics: Set[String] = Set("<topic_name>")
>
>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
>
>     kafkaStream.foreachRDD(rdd => {
>       rdd.collect().foreach(println)
>     })
>
>     context.awaitTermination()
>     context.start()
>
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
>
> What might the problem actually be?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Mich Talebzadeh <mi...@gmail.com>.
For now you can move away from Spark and look at the cause of your kafka
publishing

Also check that zookeeper is running
jps
*17102* QuorumPeerMain

runs on default port 2181

netstat -plten|grep 2181
tcp        0      0 :::2181                     :::*
LISTEN      1005       8765628    *17102*/java

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 11:59, Dominik Safaric <do...@gmail.com> wrote:

> Sounds like the issue is with Kafka channel, it is closing.
>
>
> Made the same conclusion as well. I’ve even tried further refining the
> configuration files:
>
> Zookeeper properties:
>
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # the directory where the snapshot is stored.
> dataDir=/tmp/zookeeper
> # the port at which the clients will connect
> clientPort=2181
> # disable the per-ip limit on the number of connections since this is a
> non-production config
> maxClientCnxns=20
>
> Kafka server properties:
>
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
>
> ############################# Server Basics #############################
>
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=1
>
> ############################# Socket Server Settings
> #############################
>
> listeners=PLAINTEXT://:9092
>
> # The port the socket server listens on
> #port=9092
>
> # Hostname the broker will bind to. If not set, the server will bind to
> all interfaces
> host.name=0.0.0.0
>
> # Hostname the broker will advertise to producers and consumers. If not
> set, it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> #advertised.host.name=<hostname routable by clients>
>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> #advertised.port=<port accessible by clients>
>
> # The number of threads handling network requests
> num.network.threads=3
>
> # The number of threads doing disk I/O
> num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=102400
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
>
> ############################# Log Basics #############################
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/tmp/kafka-logs
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> num.partitions=1
>
> # The number of threads per data directory to be used for log recovery at
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data
> dirs located in RAID array.
> num.recovery.threads.per.data.dir=1
>
> ############################# Log Flush Policy
> #############################
>
> # Messages are immediately written to the filesystem but by default we
> only fsync() to sync
> # the OS cache lazily. The following configurations control the flush of
> data to disk.
> # There are a few important trade-offs here:
> #    1. Durability: Unflushed data may be lost if you are not using
> replication.
> #    2. Latency: Very large flush intervals may lead to latency spikes
> when the flush does occur as there will be a lot of data to flush.
> #    3. Throughput: The flush is generally the most expensive operation,
> and a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on
> a per-topic basis.
>
> # The number of messages to accept before forcing a flush of data to disk
> #log.flush.interval.messages=10000
>
> # The maximum amount of time a message can sit in a log before we force a
> flush
> #log.flush.interval.ms=1000
>
> ############################# Log Retention Policy
> #############################
>
> # The following configurations control the disposal of log segments. The
> policy can
> # be set to delete segments after a period of time, or after a given size
> has accumulated.
> # A segment will be deleted whenever *either* of these criteria are met.
> Deletion always happens
> # from the end of the log.
>
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=168
>
> # A size-based retention policy for logs. Segments are pruned from the log
> as long as the remaining
> # segments don't drop below log.retention.bytes.
> #log.retention.bytes=1073741824
>
> # The maximum size of a log segment file. When this size is reached a new
> log segment will be created.
> log.segment.bytes=1073741824
>
> # The interval at which log segments are checked to see if they can be
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=300000
>
> ############################# Zookeeper #############################
>
> # Zookeeper connection string (see zookeeper docs for details).
> # This is a comma separated host:port pairs, each corresponding to a zk
> # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
> # You can also append an optional chroot string to the urls to specify the
> # root directory for all kafka znodes.
> zookeeper.connect=localhost:2181
>
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
>
>
> Kafka consumer properties:
>
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.consumer.ConsumerConfig for more details
>
> # Zookeeper connection string
> # comma separated host:port pairs, each corresponding to a zk
> # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
> zookeeper.connect=127.0.0.1:2181
>
> # timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
>
> #consumer group id
> group.id=test-consumer-group
>
> #consumer timeout
> #consumer.timeout.ms=5000
>
>
> For proof-of-concept purpose, this basic configuration shall be executable
> in terms of being able to consume messages from Kafka. Or perhaps not?
>
> On 07 Jun 2016, at 12:44, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Sounds like the issue is with Kafka channel, it is closing.
>
>  Reconnect due to socket error: java.nio.channels.ClosedChannelException
>
> Can you relax that
>
> val ssc = new StreamingContext(sparkConf, Seconds(20)
>
> Also how are you getting your source data? You can actually have both
> Spark and the output below at the same time running tol see the exact cause
> of it
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --from-beginning --topic newtopic
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 7 June 2016 at 11:32, Dominik Safaric <do...@gmail.com> wrote:
>
>> Unfortunately, even with this Spark configuration and Kafka parameters,
>> the same exception keeps occurring:
>>
>> 16/06/07 12:26:11 INFO SimpleConsumer: Reconnect due to socket error:
>> java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([<topicname>,0])
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>
>> If it helps for troubleshooting, here are the logs of the Kafka server:
>>
>> 16-06-07 10:24:58,349] INFO Initiating client connection,
>> connectString=localhost:2181 sessionTimeout=6000
>> watcher=org.I0Itec.zkclient.ZkClient@4e05faa7
>> (org.apache.zookeeper.ZooKeeper)
>> [2016-06-07 10:24:58,365] INFO Opening socket connection to server
>> localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
>> (unknown error) (org.apache.zookeeper.ClientCnxn)
>> [2016-06-07 10:24:58,365] INFO Waiting for keeper state SyncConnected
>> (org.I0Itec.zkclient.ZkClient)
>> [2016-06-07 10:24:58,375] INFO Socket connection established to localhost/
>> 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
>> [2016-06-07 10:24:58,405] INFO Session establishment complete on server
>> localhost/127.0.0.1:2181, sessionid = 0x1552a64a9a80000, negotiated
>> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
>> [2016-06-07 10:24:58,408] INFO zookeeper state changed (SyncConnected)
>> (org.I0Itec.zkclient.ZkClient)
>> [2016-06-07 10:24:58,562] INFO Loading logs. (kafka.log.LogManager)
>> [2016-06-07 10:24:58,608] INFO Completed load of log <topic_name>-0 with
>> log end offset 15 (kafka.log.Log)
>> [2016-06-07 10:24:58,614] INFO Completed load of log _schemas-0 with log
>> end offset 1 (kafka.log.Log)
>> [2016-06-07 10:24:58,617] INFO Completed load of log <topic_name>-0 with
>> log end offset 5 (kafka.log.Log)
>> [2016-06-07 10:24:58,620] INFO Completed load of log <topic_name>-0 with
>> log end offset 2 (kafka.log.Log)
>> [2016-06-07 10:24:58,629] INFO Completed load of log <topic_name>-0 with
>> log end offset 1759 (kafka.log.Log)
>> [2016-06-07 10:24:58,635] INFO Logs loading complete.
>> (kafka.log.LogManager)
>> [2016-06-07 10:24:58,737] INFO Starting log cleanup with a period of
>> 300000 ms. (kafka.log.LogManager)
>> [2016-06-07 10:24:58,739] INFO Starting log flusher with a default period
>> of 9223372036854775807 ms. (kafka.log.LogManager)
>> [2016-06-07 10:24:58,798] INFO Awaiting socket connections on
>> 0.0.0.0:9092. (kafka.network.Acceptor)
>> [2016-06-07 10:24:58,809] INFO [Socket Server on Broker 1], Started 1
>> acceptor threads (kafka.network.SocketServer)
>> [2016-06-07 10:24:58,849] INFO [ExpirationReaper-1], Starting
>> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
>> [2016-06-07 10:24:58,850] INFO [ExpirationReaper-1], Starting
>> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
>> [2016-06-07 10:24:58,953] INFO Creating /controller (is it secure? false)
>> (kafka.utils.ZKCheckedEphemeral)
>> [2016-06-07 10:24:58,973] INFO Result of znode creation is: OK
>> (kafka.utils.ZKCheckedEphemeral)
>> [2016-06-07 10:24:58,974] INFO 1 successfully elected as leader
>> (kafka.server.ZookeeperLeaderElector)
>> [2016-06-07 10:24:59,180] INFO [GroupCoordinator 1]: Starting up.
>> (kafka.coordinator.GroupCoordinator)
>> [2016-06-07 10:24:59,191] INFO [ExpirationReaper-1], Starting
>> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
>> [2016-06-07 10:24:59,194] INFO New leader is 1
>> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
>> [2016-06-07 10:24:59,198] INFO [Group Metadata Manager on Broker 1]:
>> Removed 0 expired offsets in 16 milliseconds.
>> (kafka.coordinator.GroupMetadataManager)
>> [2016-06-07 10:24:59,195] INFO [ExpirationReaper-1], Starting
>> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
>> [2016-06-07 10:24:59,195] INFO [GroupCoordinator 1]: Startup complete.
>> (kafka.coordinator.GroupCoordinator)
>> [2016-06-07 10:24:59,215] INFO [ThrottledRequestReaper-Produce],
>> Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
>> [2016-06-07 10:24:59,217] INFO [ThrottledRequestReaper-Fetch], Starting
>> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
>> [2016-06-07 10:24:59,220] INFO Will not load MX4J, mx4j-tools.jar is not
>> in the classpath (kafka.utils.Mx4jLoader$)
>> [2016-06-07 10:24:59,230] INFO Creating /brokers/ids/1 (is it secure?
>> false) (kafka.utils.ZKCheckedEphemeral)
>> [2016-06-07 10:24:59,244] INFO Result of znode creation is: OK
>> (kafka.utils.ZKCheckedEphemeral)
>> [2016-06-07 10:24:59,245] INFO Registered broker 1 at path /brokers/ids/1
>> with addresses: PLAINTEXT -> EndPoint(<public_DNS>,9092,PLAINTEXT)
>> (kafka.utils.ZkUtils)
>> [2016-06-07 10:24:59,257] INFO Kafka version : 0.9.0.1
>> (org.apache.kafka.common.utils.AppInfoParser)
>> [2016-06-07 10:24:59,257] INFO Kafka commitId : 23c69d62a0cabf06
>> (org.apache.kafka.common.utils.AppInfoParser)
>> [2016-06-07 10:24:59,258] INFO [Kafka Server 1], started
>> (kafka.server.KafkaServer)
>> [2016-06-07 10:24:59,648] INFO [ReplicaFetcherManager on broker 1]
>> Removed fetcher for partitions [<topic_name>,0]
>> (kafka.server.ReplicaFetcherManager)
>> [2016-06-07 10:24:59,682] INFO [ReplicaFetcherManager on broker 1]
>> Removed fetcher for partitions [<topic_name>,0]
>> (kafka.server.ReplicaFetcherManager)
>>
>> Whereas Zookeeper produced the following logs:
>>
>> [2016-06-07 10:24:47,935] INFO Server
>> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,935] INFO Server environment:java.io.tmpdir=/tmp
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,935] INFO Server environment:java.compiler=<NA>
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,936] INFO Server environment:os.name=Linux
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,936] INFO Server environment:os.arch=amd64
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,939] INFO Server
>> environment:os.version=4.4.11-23.53.amzn1.x86_64
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,939] INFO Server environment:user.name=ec2-user
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,939] INFO Server
>> environment:user.home=/home/ec2-user
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,939] INFO Server
>> environment:user.dir=/home/ec2-user/kafka_2.11-0.9.0.1
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,946] INFO tickTime set to 3000
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,946] INFO minSessionTimeout set to -1
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,946] INFO maxSessionTimeout set to -1
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:47,955] INFO binding to port 0.0.0.0/0.0.0.0:2181
>> (org.apache.zookeeper.server.NIOServerCnxnFactory)
>> [2016-06-07 10:24:58,370] INFO Accepted socket connection from /
>> 127.0.0.1:41368 (org.apache.zookeeper.server.NIOServerCnxnFactory)
>> [2016-06-07 10:24:58,384] INFO Client attempting to establish new session
>> at /127.0.0.1:41368 (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:58,389] INFO Creating new log file: log.3eb
>> (org.apache.zookeeper.server.persistence.FileTxnLog)
>> [2016-06-07 10:24:58,400] INFO Established session 0x1552a64a9a80000 with
>> negotiated timeout 6000 for client /127.0.0.1:41368
>> (org.apache.zookeeper.server.ZooKeeperServer)
>> [2016-06-07 10:24:59,154] INFO Got user-level KeeperException when
>> processing sessionid:0x1552a64a9a80000 type:delete cxid:0x26 zxid:0x3ee
>> txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election
>> Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
>> (org.apache.zookeeper.server.PrepRequestProcessor)
>> [2016-06-07 10:24:59,231] INFO Got user-level KeeperException when
>> processing sessionid:0x1552a64a9a80000 type:create cxid:0x2d zxid:0x3ef
>> txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode =
>> NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
>> [2016-06-07 10:24:59,232] INFO Got user-level KeeperException when
>> processing sessionid:0x1552a64a9a80000 type:create cxid:0x2e zxid:0x3f0
>> txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode =
>> NodeExists for /brokers/ids
>> (org.apache.zookeeper.server.PrepRequestProcessor)
>>
>> Interestedly, I am able to both retrieve the messages from the specified
>> topic using the console Consumer and produce messages using the REST API.
>>
>> As for Kafka/Zookeeper accessibility, since this is a proof-of-concept,
>> all connections to the ports have been allowed.
>>
>> On 07 Jun 2016, at 12:14, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>> ok that is good
>>
>> Yours is basically simple streaming with Kafka (publishing topic) and
>> your Spark streaming. use the following as blueprint
>>
>> // Create a local StreamingContext with two working thread and batch
>> interval of 2 seconds.
>> val sparkConf = new SparkConf().
>>              setAppName("CEP_streaming").
>>              setMaster("local[2]").
>>              set("spark.executor.memory", "4G").
>>              set("spark.cores.max", "2").
>>              set("spark.streaming.concurrentJobs", "2").
>>              set("spark.driver.allowMultipleContexts", "true").
>>              set("spark.hadoop.validateOutputSpecs", "false")
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>> ssc.checkpoint("checkpoint")
>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_streaming" )
>> val topics = Set("newtopic")
>> val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>> dstream.cache()
>>
>> val lines = dstream.map(_._2)
>> val price = lines.map(_.split(',').view(2)).map(_.toFloat)
>> // window length - The duration of the window below that must be multiple
>> of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
>> val windowLength = 4
>> // sliding interval - The interval at which the window operation is
>> performed in other words data is collected within this "previous interval'
>> val slidingInterval = 2  // keep this the same as batch window for
>> continuous streaming. You are aggregating data that you are collecting over
>> the  batch Window
>> val countByValueAndWindow = price.filter(_ >
>> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>> countByValueAndWindow.print()
>> //
>> ssc.start()
>> ssc.awaitTermination()
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 7 June 2016 at 10:58, Dominik Safaric <do...@gmail.com>
>> wrote:
>>
>>> Dear Mich,
>>>
>>> Thank you for the reply.
>>>
>>> By running the following command in the command line:
>>>
>>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
>>> <topic_name> --from-beginning
>>>
>>> I do indeed retrieve all messages of a topic.
>>>
>>> Any indication onto what might cause the issue?
>>>
>>> An important note to make,  I’m using the default configuration of both
>>> Kafka and Zookeeper.
>>>
>>> On 07 Jun 2016, at 11:39, Mich Talebzadeh <mi...@gmail.com>
>>> wrote:
>>>
>>> I assume you zookeeper is up and running
>>>
>>> can you confirm that you are getting topics from kafka independently for
>>> example on the command line
>>>
>>> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>>> --from-beginning --topic newtopic
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 7 June 2016 at 10:06, Dominik Safaric <do...@gmail.com>
>>> wrote:
>>>
>>>> As I am trying to integrate Kafka into Spark, the following exception
>>>> occurs:
>>>>
>>>> org.apache.spark.SparkException:
>>>> java.nio.channels.ClosedChannelException
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([*<topicName>*,0])
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at scala.util.Either.fold(Either.scala:97)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>>         at .<init>(<console>:11)
>>>>         at .<clinit>(<console>)
>>>>         at .<init>(<console>:7)
>>>>         at .<clinit>(<console>)
>>>>         at $print(<console>)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>>
>>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>>         at
>>>>
>>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at
>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>>
>>>> As for the Spark configuration:
>>>>
>>>>    val conf: SparkConf = new
>>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>>>
>>>>     val confParams: Map[String, String] = Map(
>>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>>       "auto.offset.reset" -> "largest"
>>>>     )
>>>>
>>>>     val topics: Set[String] = Set("<topic_name>")
>>>>
>>>>     val context: StreamingContext = new StreamingContext(conf,
>>>> Seconds(1))
>>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>>> topics)
>>>>
>>>>     kafkaStream.foreachRDD(rdd => {
>>>>       rdd.collect().foreach(println)
>>>>     })
>>>>
>>>>     context.awaitTermination()
>>>>     context.start()
>>>>
>>>> The Kafka topic does exist, Kafka server is up and running and I am
>>>> able to
>>>> produce messages to that particular topic using the Confluent REST API.
>>>>
>>>> What might the problem actually be?
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>>> <http://nabble.com/>.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>>
>>
>>
>
>

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Dominik Safaric <do...@gmail.com>.
> Sounds like the issue is with Kafka channel, it is closing.


Made the same conclusion as well. I’ve even tried further refining the configuration files:

Zookeeper properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=20

Kafka server properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

listeners=PLAINTEXT://:9092

# The port the socket server listens on
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=0.0.0.0

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


Kafka consumer properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.consumer.ConsumerConfig for more details

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000


For proof-of-concept purpose, this basic configuration shall be executable in terms of being able to consume messages from Kafka. Or perhaps not?

> On 07 Jun 2016, at 12:44, Mich Talebzadeh <mi...@gmail.com> wrote:
> 
> Sounds like the issue is with Kafka channel, it is closing.
> 
>  Reconnect due to socket error: java.nio.channels.ClosedChannelException
> 
> Can you relax that
> 
> val ssc = new StreamingContext(sparkConf, Seconds(20)
> 
> Also how are you getting your source data? You can actually have both Spark and the output below at the same time running tol see the exact cause of it
> 
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --from-beginning --topic newtopic
> 
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 7 June 2016 at 11:32, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> Unfortunately, even with this Spark configuration and Kafka parameters, the same exception keeps occurring:
> 
> 16/06/07 12:26:11 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for Set([<topicname>,0])
> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> 	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> 	at scala.util.Either.fold(Either.scala:97)
> 	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> 	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> 
> If it helps for troubleshooting, here are the logs of the Kafka server:
> 
> 16-06-07 10:24:58,349] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@4e05faa7 (org.apache.zookeeper.ZooKeeper)
> [2016-06-07 10:24:58,365] INFO Opening socket connection to server localhost/127.0.0.1:2181 <http://127.0.0.1:2181/>. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,365] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
> [2016-06-07 10:24:58,375] INFO Socket connection established to localhost/127.0.0.1:2181 <http://127.0.0.1:2181/>, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,405] INFO Session establishment complete on server localhost/127.0.0.1:2181 <http://127.0.0.1:2181/>, sessionid = 0x1552a64a9a80000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,408] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
> [2016-06-07 10:24:58,562] INFO Loading logs. (kafka.log.LogManager)
> [2016-06-07 10:24:58,608] INFO Completed load of log <topic_name>-0 with log end offset 15 (kafka.log.Log)
> [2016-06-07 10:24:58,614] INFO Completed load of log _schemas-0 with log end offset 1 (kafka.log.Log)
> [2016-06-07 10:24:58,617] INFO Completed load of log <topic_name>-0 with log end offset 5 (kafka.log.Log)
> [2016-06-07 10:24:58,620] INFO Completed load of log <topic_name>-0 with log end offset 2 (kafka.log.Log)
> [2016-06-07 10:24:58,629] INFO Completed load of log <topic_name>-0 with log end offset 1759 (kafka.log.Log)
> [2016-06-07 10:24:58,635] INFO Logs loading complete. (kafka.log.LogManager)
> [2016-06-07 10:24:58,737] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
> [2016-06-07 10:24:58,739] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
> [2016-06-07 10:24:58,798] INFO Awaiting socket connections on 0.0.0.0:9092 <http://0.0.0.0:9092/>. (kafka.network.Acceptor)
> [2016-06-07 10:24:58,809] INFO [Socket Server on Broker 1], Started 1 acceptor threads (kafka.network.SocketServer)
> [2016-06-07 10:24:58,849] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:58,850] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:58,953] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:58,973] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:58,974] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
> [2016-06-07 10:24:59,180] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.GroupCoordinator)
> [2016-06-07 10:24:59,191] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:59,194] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-06-07 10:24:59,198] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 16 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2016-06-07 10:24:59,195] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:59,195] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.GroupCoordinator)
> [2016-06-07 10:24:59,215] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2016-06-07 10:24:59,217] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2016-06-07 10:24:59,220] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
> [2016-06-07 10:24:59,230] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:59,244] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:59,245] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(<public_DNS>,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> [2016-06-07 10:24:59,257] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
> [2016-06-07 10:24:59,257] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser)
> [2016-06-07 10:24:59,258] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
> [2016-06-07 10:24:59,648] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager)
> [2016-06-07 10:24:59,682] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager)
> 
> Whereas Zookeeper produced the following logs:
> 
> [2016-06-07 10:24:47,935] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,935] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,935] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,936] INFO Server environment:os.name <http://os.name/>=Linux (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,936] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server environment:os.version=4.4.11-23.53.amzn1.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server environment:user.name <http://user.name/>=ec2-user (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server environment:user.home=/home/ec2-user (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server environment:user.dir=/home/ec2-user/kafka_2.11-0.9.0.1 (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,946] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,946] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,946] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,955] INFO binding to port 0.0.0.0/0.0.0.0:2181 <http://0.0.0.0/0.0.0.0:2181> (org.apache.zookeeper.server.NIOServerCnxnFactory)
> [2016-06-07 10:24:58,370] INFO Accepted socket connection from /127.0.0.1:41368 <http://127.0.0.1:41368/> (org.apache.zookeeper.server.NIOServerCnxnFactory)
> [2016-06-07 10:24:58,384] INFO Client attempting to establish new session at /127.0.0.1:41368 <http://127.0.0.1:41368/> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:58,389] INFO Creating new log file: log.3eb (org.apache.zookeeper.server.persistence.FileTxnLog)
> [2016-06-07 10:24:58,400] INFO Established session 0x1552a64a9a80000 with negotiated timeout 6000 for client /127.0.0.1:41368 <http://127.0.0.1:41368/> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:59,154] INFO Got user-level KeeperException when processing sessionid:0x1552a64a9a80000 type:delete cxid:0x26 zxid:0x3ee txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
> [2016-06-07 10:24:59,231] INFO Got user-level KeeperException when processing sessionid:0x1552a64a9a80000 type:create cxid:0x2d zxid:0x3ef txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
> [2016-06-07 10:24:59,232] INFO Got user-level KeeperException when processing sessionid:0x1552a64a9a80000 type:create cxid:0x2e zxid:0x3f0 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
> 
> Interestedly, I am able to both retrieve the messages from the specified topic using the console Consumer and produce messages using the REST API. 
> 
> As for Kafka/Zookeeper accessibility, since this is a proof-of-concept, all connections to the ports have been allowed.  
> 
>> On 07 Jun 2016, at 12:14, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>> 
>> ok that is good
>> 
>> Yours is basically simple streaming with Kafka (publishing topic) and your Spark streaming. use the following as blueprint
>> 
>> // Create a local StreamingContext with two working thread and batch interval of 2 seconds.
>> val sparkConf = new SparkConf().
>>              setAppName("CEP_streaming").
>>              setMaster("local[2]").
>>              set("spark.executor.memory", "4G").
>>              set("spark.cores.max", "2").
>>              set("spark.streaming.concurrentJobs", "2").
>>              set("spark.driver.allowMultipleContexts", "true").
>>              set("spark.hadoop.validateOutputSpecs", "false")
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>> ssc.checkpoint("checkpoint")
>> val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081 <http://rhes564:8081/>", "zookeeper.connect" -> "rhes564:2181", "group.id <http://group.id/>" -> "CEP_streaming" )
>> val topics = Set("newtopic")
>> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>> dstream.cache()
>> 
>> val lines = dstream.map(_._2)
>> val price = lines.map(_.split(',').view(2)).map(_.toFloat)
>> // window length - The duration of the window below that must be multiple of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
>> val windowLength = 4
>> // sliding interval - The interval at which the window operation is performed in other words data is collected within this "previous interval'
>> val slidingInterval = 2  // keep this the same as batch window for continuous streaming. You are aggregating data that you are collecting over the  batch Window
>> val countByValueAndWindow = price.filter(_ > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>> countByValueAndWindow.print()
>> //
>> ssc.start()
>> ssc.awaitTermination()
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>  
>> 
>> On 7 June 2016 at 10:58, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>> Dear Mich,
>> 
>> Thank you for the reply.
>> 
>> By running the following command in the command line:
>> 
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topic_name> --from-beginning
>> 
>> I do indeed retrieve all messages of a topic. 
>> 
>> Any indication onto what might cause the issue? 
>> 
>> An important note to make,  I’m using the default configuration of both Kafka and Zookeeper.
>> 
>>> On 07 Jun 2016, at 11:39, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> I assume you zookeeper is up and running
>>> 
>>> can you confirm that you are getting topics from kafka independently for example on the command line
>>> 
>>> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --from-beginning --topic newtopic
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>>  
>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>>  
>>> 
>>> On 7 June 2016 at 10:06, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>>> As I am trying to integrate Kafka into Spark, the following exception occurs:
>>> 
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([*<topicName>*,0])
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at scala.util.Either.fold(Either.scala:97)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>         at .<init>(<console>:11)
>>>         at .<clinit>(<console>)
>>>         at .<init>(<console>:7)
>>>         at .<clinit>(<console>)
>>>         at $print(<console>)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>         at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>         at
>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>> 
>>> As for the Spark configuration:
>>> 
>>>    val conf: SparkConf = new
>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>> 
>>>     val confParams: Map[String, String] = Map(
>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>       "auto.offset.reset" -> "largest"
>>>     )
>>> 
>>>     val topics: Set[String] = Set("<topic_name>")
>>> 
>>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>> topics)
>>> 
>>>     kafkaStream.foreachRDD(rdd => {
>>>       rdd.collect().foreach(println)
>>>     })
>>> 
>>>     context.awaitTermination()
>>>     context.start()
>>> 
>>> The Kafka topic does exist, Kafka server is up and running and I am able to
>>> produce messages to that particular topic using the Confluent REST API.
>>> 
>>> What might the problem actually be?
>>> 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com <http://nabble.com/>.
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
>>> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
>>> 
>>> 
>> 
>> 
> 
> 


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Mich Talebzadeh <mi...@gmail.com>.
Sounds like the issue is with Kafka channel, it is closing.

 Reconnect due to socket error: java.nio.channels.ClosedChannelException

Can you relax that

val ssc = new StreamingContext(sparkConf, Seconds(20)

Also how are you getting your source data? You can actually have both Spark
and the output below at the same time running tol see the exact cause of it

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--from-beginning --topic newtopic





Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 11:32, Dominik Safaric <do...@gmail.com> wrote:

> Unfortunately, even with this Spark configuration and Kafka parameters,
> the same exception keeps occurring:
>
> 16/06/07 12:26:11 INFO SimpleConsumer: Reconnect due to socket error:
> java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([<topicname>,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>
> If it helps for troubleshooting, here are the logs of the Kafka server:
>
> 16-06-07 10:24:58,349] INFO Initiating client connection,
> connectString=localhost:2181 sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@4e05faa7
> (org.apache.zookeeper.ZooKeeper)
> [2016-06-07 10:24:58,365] INFO Opening socket connection to server
> localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,365] INFO Waiting for keeper state SyncConnected
> (org.I0Itec.zkclient.ZkClient)
> [2016-06-07 10:24:58,375] INFO Socket connection established to localhost/
> 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,405] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, sessionid = 0x1552a64a9a80000, negotiated
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,408] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2016-06-07 10:24:58,562] INFO Loading logs. (kafka.log.LogManager)
> [2016-06-07 10:24:58,608] INFO Completed load of log <topic_name>-0 with
> log end offset 15 (kafka.log.Log)
> [2016-06-07 10:24:58,614] INFO Completed load of log _schemas-0 with log
> end offset 1 (kafka.log.Log)
> [2016-06-07 10:24:58,617] INFO Completed load of log <topic_name>-0 with
> log end offset 5 (kafka.log.Log)
> [2016-06-07 10:24:58,620] INFO Completed load of log <topic_name>-0 with
> log end offset 2 (kafka.log.Log)
> [2016-06-07 10:24:58,629] INFO Completed load of log <topic_name>-0 with
> log end offset 1759 (kafka.log.Log)
> [2016-06-07 10:24:58,635] INFO Logs loading complete.
> (kafka.log.LogManager)
> [2016-06-07 10:24:58,737] INFO Starting log cleanup with a period of
> 300000 ms. (kafka.log.LogManager)
> [2016-06-07 10:24:58,739] INFO Starting log flusher with a default period
> of 9223372036854775807 ms. (kafka.log.LogManager)
> [2016-06-07 10:24:58,798] INFO Awaiting socket connections on 0.0.0.0:9092.
> (kafka.network.Acceptor)
> [2016-06-07 10:24:58,809] INFO [Socket Server on Broker 1], Started 1
> acceptor threads (kafka.network.SocketServer)
> [2016-06-07 10:24:58,849] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:58,850] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:58,953] INFO Creating /controller (is it secure? false)
> (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:58,973] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:58,974] INFO 1 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2016-06-07 10:24:59,180] INFO [GroupCoordinator 1]: Starting up.
> (kafka.coordinator.GroupCoordinator)
> [2016-06-07 10:24:59,191] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:59,194] INFO New leader is 1
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-06-07 10:24:59,198] INFO [Group Metadata Manager on Broker 1]:
> Removed 0 expired offsets in 16 milliseconds.
> (kafka.coordinator.GroupMetadataManager)
> [2016-06-07 10:24:59,195] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:59,195] INFO [GroupCoordinator 1]: Startup complete.
> (kafka.coordinator.GroupCoordinator)
> [2016-06-07 10:24:59,215] INFO [ThrottledRequestReaper-Produce], Starting
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2016-06-07 10:24:59,217] INFO [ThrottledRequestReaper-Fetch], Starting
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2016-06-07 10:24:59,220] INFO Will not load MX4J, mx4j-tools.jar is not
> in the classpath (kafka.utils.Mx4jLoader$)
> [2016-06-07 10:24:59,230] INFO Creating /brokers/ids/1 (is it secure?
> false) (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:59,244] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:59,245] INFO Registered broker 1 at path /brokers/ids/1
> with addresses: PLAINTEXT -> EndPoint(<public_DNS>,9092,PLAINTEXT)
> (kafka.utils.ZkUtils)
> [2016-06-07 10:24:59,257] INFO Kafka version : 0.9.0.1
> (org.apache.kafka.common.utils.AppInfoParser)
> [2016-06-07 10:24:59,257] INFO Kafka commitId : 23c69d62a0cabf06
> (org.apache.kafka.common.utils.AppInfoParser)
> [2016-06-07 10:24:59,258] INFO [Kafka Server 1], started
> (kafka.server.KafkaServer)
> [2016-06-07 10:24:59,648] INFO [ReplicaFetcherManager on broker 1] Removed
> fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager)
> [2016-06-07 10:24:59,682] INFO [ReplicaFetcherManager on broker 1] Removed
> fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager)
>
> Whereas Zookeeper produced the following logs:
>
> [2016-06-07 10:24:47,935] INFO Server
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,935] INFO Server environment:java.io.tmpdir=/tmp
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,935] INFO Server environment:java.compiler=<NA>
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,936] INFO Server environment:os.name=Linux
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,936] INFO Server environment:os.arch=amd64
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server
> environment:os.version=4.4.11-23.53.amzn1.x86_64
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server environment:user.name=ec2-user
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server environment:user.home=/home/ec2-user
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,939] INFO Server
> environment:user.dir=/home/ec2-user/kafka_2.11-0.9.0.1
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,946] INFO tickTime set to 3000
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,946] INFO minSessionTimeout set to -1
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,946] INFO maxSessionTimeout set to -1
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:47,955] INFO binding to port 0.0.0.0/0.0.0.0:2181
> (org.apache.zookeeper.server.NIOServerCnxnFactory)
> [2016-06-07 10:24:58,370] INFO Accepted socket connection from /
> 127.0.0.1:41368 (org.apache.zookeeper.server.NIOServerCnxnFactory)
> [2016-06-07 10:24:58,384] INFO Client attempting to establish new session
> at /127.0.0.1:41368 (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:58,389] INFO Creating new log file: log.3eb
> (org.apache.zookeeper.server.persistence.FileTxnLog)
> [2016-06-07 10:24:58,400] INFO Established session 0x1552a64a9a80000 with
> negotiated timeout 6000 for client /127.0.0.1:41368
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2016-06-07 10:24:59,154] INFO Got user-level KeeperException when
> processing sessionid:0x1552a64a9a80000 type:delete cxid:0x26 zxid:0x3ee
> txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election
> Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
> (org.apache.zookeeper.server.PrepRequestProcessor)
> [2016-06-07 10:24:59,231] INFO Got user-level KeeperException when
> processing sessionid:0x1552a64a9a80000 type:create cxid:0x2d zxid:0x3ef
> txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode =
> NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
> [2016-06-07 10:24:59,232] INFO Got user-level KeeperException when
> processing sessionid:0x1552a64a9a80000 type:create cxid:0x2e zxid:0x3f0
> txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode =
> NodeExists for /brokers/ids
> (org.apache.zookeeper.server.PrepRequestProcessor)
>
> Interestedly, I am able to both retrieve the messages from the specified
> topic using the console Consumer and produce messages using the REST API.
>
> As for Kafka/Zookeeper accessibility, since this is a proof-of-concept,
> all connections to the ports have been allowed.
>
> On 07 Jun 2016, at 12:14, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> ok that is good
>
> Yours is basically simple streaming with Kafka (publishing topic) and your
> Spark streaming. use the following as blueprint
>
> // Create a local StreamingContext with two working thread and batch
> interval of 2 seconds.
> val sparkConf = new SparkConf().
>              setAppName("CEP_streaming").
>              setMaster("local[2]").
>              set("spark.executor.memory", "4G").
>              set("spark.cores.max", "2").
>              set("spark.streaming.concurrentJobs", "2").
>              set("spark.driver.allowMultipleContexts", "true").
>              set("spark.hadoop.validateOutputSpecs", "false")
> val ssc = new StreamingContext(sparkConf, Seconds(2))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_streaming" )
> val topics = Set("newtopic")
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topics)
> dstream.cache()
>
> val lines = dstream.map(_._2)
> val price = lines.map(_.split(',').view(2)).map(_.toFloat)
> // window length - The duration of the window below that must be multiple
> of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
> val windowLength = 4
> // sliding interval - The interval at which the window operation is
> performed in other words data is collected within this "previous interval'
> val slidingInterval = 2  // keep this the same as batch window for
> continuous streaming. You are aggregating data that you are collecting over
> the  batch Window
> val countByValueAndWindow = price.filter(_ >
> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
> countByValueAndWindow.print()
> //
> ssc.start()
> ssc.awaitTermination()
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 7 June 2016 at 10:58, Dominik Safaric <do...@gmail.com> wrote:
>
>> Dear Mich,
>>
>> Thank you for the reply.
>>
>> By running the following command in the command line:
>>
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
>> <topic_name> --from-beginning
>>
>> I do indeed retrieve all messages of a topic.
>>
>> Any indication onto what might cause the issue?
>>
>> An important note to make,  I’m using the default configuration of both
>> Kafka and Zookeeper.
>>
>> On 07 Jun 2016, at 11:39, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>> I assume you zookeeper is up and running
>>
>> can you confirm that you are getting topics from kafka independently for
>> example on the command line
>>
>> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>> --from-beginning --topic newtopic
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 7 June 2016 at 10:06, Dominik Safaric <do...@gmail.com>
>> wrote:
>>
>>> As I am trying to integrate Kafka into Spark, the following exception
>>> occurs:
>>>
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([*<topicName>*,0])
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>         at scala.util.Either.fold(Either.scala:97)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>         at .<init>(<console>:11)
>>>         at .<clinit>(<console>)
>>>         at .<init>(<console>:7)
>>>         at .<clinit>(<console>)
>>>         at $print(<console>)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at
>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>         at
>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>         at
>>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>         at
>>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>         at
>>>
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>         at
>>>
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>>
>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>         at
>>>
>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>         at
>>>
>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>         at
>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>
>>> As for the Spark configuration:
>>>
>>>    val conf: SparkConf = new
>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>>
>>>     val confParams: Map[String, String] = Map(
>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>       "auto.offset.reset" -> "largest"
>>>     )
>>>
>>>     val topics: Set[String] = Set("<topic_name>")
>>>
>>>     val context: StreamingContext = new StreamingContext(conf,
>>> Seconds(1))
>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>> topics)
>>>
>>>     kafkaStream.foreachRDD(rdd => {
>>>       rdd.collect().foreach(println)
>>>     })
>>>
>>>     context.awaitTermination()
>>>     context.start()
>>>
>>> The Kafka topic does exist, Kafka server is up and running and I am able
>>> to
>>> produce messages to that particular topic using the Confluent REST API.
>>>
>>> What might the problem actually be?
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> <http://nabble.com/>.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>>
>
>

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Dominik Safaric <do...@gmail.com>.
Unfortunately, even with this Spark configuration and Kafka parameters, the same exception keeps occurring:

16/06/07 12:26:11 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([<topicname>,0])
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
	at scala.util.Either.fold(Either.scala:97)
	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)

If it helps for troubleshooting, here are the logs of the Kafka server:

16-06-07 10:24:58,349] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@4e05faa7 (org.apache.zookeeper.ZooKeeper)
[2016-06-07 10:24:58,365] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-06-07 10:24:58,365] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2016-06-07 10:24:58,375] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-06-07 10:24:58,405] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1552a64a9a80000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-06-07 10:24:58,408] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-06-07 10:24:58,562] INFO Loading logs. (kafka.log.LogManager)
[2016-06-07 10:24:58,608] INFO Completed load of log <topic_name>-0 with log end offset 15 (kafka.log.Log)
[2016-06-07 10:24:58,614] INFO Completed load of log _schemas-0 with log end offset 1 (kafka.log.Log)
[2016-06-07 10:24:58,617] INFO Completed load of log <topic_name>-0 with log end offset 5 (kafka.log.Log)
[2016-06-07 10:24:58,620] INFO Completed load of log <topic_name>-0 with log end offset 2 (kafka.log.Log)
[2016-06-07 10:24:58,629] INFO Completed load of log <topic_name>-0 with log end offset 1759 (kafka.log.Log)
[2016-06-07 10:24:58,635] INFO Logs loading complete. (kafka.log.LogManager)
[2016-06-07 10:24:58,737] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2016-06-07 10:24:58,739] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2016-06-07 10:24:58,798] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-06-07 10:24:58,809] INFO [Socket Server on Broker 1], Started 1 acceptor threads (kafka.network.SocketServer)
[2016-06-07 10:24:58,849] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:58,850] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:58,953] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:58,973] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:58,974] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-06-07 10:24:59,180] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.GroupCoordinator)
[2016-06-07 10:24:59,191] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:59,194] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-06-07 10:24:59,198] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 16 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-06-07 10:24:59,195] INFO [ExpirationReaper-1], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:59,195] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2016-06-07 10:24:59,215] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-06-07 10:24:59,217] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-06-07 10:24:59,220] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-06-07 10:24:59,230] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:59,244] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:59,245] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(<public_DNS>,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-06-07 10:24:59,257] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-06-07 10:24:59,257] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser)
[2016-06-07 10:24:59,258] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
[2016-06-07 10:24:59,648] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager)
[2016-06-07 10:24:59,682] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager)

Whereas Zookeeper produced the following logs:

[2016-06-07 10:24:47,935] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,935] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,935] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,936] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,936] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,939] INFO Server environment:os.version=4.4.11-23.53.amzn1.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,939] INFO Server environment:user.name=ec2-user (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,939] INFO Server environment:user.home=/home/ec2-user (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,939] INFO Server environment:user.dir=/home/ec2-user/kafka_2.11-0.9.0.1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,946] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,946] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,946] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:47,955] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-06-07 10:24:58,370] INFO Accepted socket connection from /127.0.0.1:41368 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-06-07 10:24:58,384] INFO Client attempting to establish new session at /127.0.0.1:41368 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:58,389] INFO Creating new log file: log.3eb (org.apache.zookeeper.server.persistence.FileTxnLog)
[2016-06-07 10:24:58,400] INFO Established session 0x1552a64a9a80000 with negotiated timeout 6000 for client /127.0.0.1:41368 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-07 10:24:59,154] INFO Got user-level KeeperException when processing sessionid:0x1552a64a9a80000 type:delete cxid:0x26 zxid:0x3ee txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-06-07 10:24:59,231] INFO Got user-level KeeperException when processing sessionid:0x1552a64a9a80000 type:create cxid:0x2d zxid:0x3ef txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-06-07 10:24:59,232] INFO Got user-level KeeperException when processing sessionid:0x1552a64a9a80000 type:create cxid:0x2e zxid:0x3f0 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

Interestedly, I am able to both retrieve the messages from the specified topic using the console Consumer and produce messages using the REST API. 

As for Kafka/Zookeeper accessibility, since this is a proof-of-concept, all connections to the ports have been allowed.  

> On 07 Jun 2016, at 12:14, Mich Talebzadeh <mi...@gmail.com> wrote:
> 
> ok that is good
> 
> Yours is basically simple streaming with Kafka (publishing topic) and your Spark streaming. use the following as blueprint
> 
> // Create a local StreamingContext with two working thread and batch interval of 2 seconds.
> val sparkConf = new SparkConf().
>              setAppName("CEP_streaming").
>              setMaster("local[2]").
>              set("spark.executor.memory", "4G").
>              set("spark.cores.max", "2").
>              set("spark.streaming.concurrentJobs", "2").
>              set("spark.driver.allowMultipleContexts", "true").
>              set("spark.hadoop.validateOutputSpecs", "false")
> val ssc = new StreamingContext(sparkConf, Seconds(2))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081 <http://rhes564:8081/>", "zookeeper.connect" -> "rhes564:2181", "group.id <http://group.id/>" -> "CEP_streaming" )
> val topics = Set("newtopic")
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
> dstream.cache()
> 
> val lines = dstream.map(_._2)
> val price = lines.map(_.split(',').view(2)).map(_.toFloat)
> // window length - The duration of the window below that must be multiple of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
> val windowLength = 4
> // sliding interval - The interval at which the window operation is performed in other words data is collected within this "previous interval'
> val slidingInterval = 2  // keep this the same as batch window for continuous streaming. You are aggregating data that you are collecting over the  batch Window
> val countByValueAndWindow = price.filter(_ > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
> countByValueAndWindow.print()
> //
> ssc.start()
> ssc.awaitTermination()
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 7 June 2016 at 10:58, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> Dear Mich,
> 
> Thank you for the reply.
> 
> By running the following command in the command line:
> 
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topic_name> --from-beginning
> 
> I do indeed retrieve all messages of a topic. 
> 
> Any indication onto what might cause the issue? 
> 
> An important note to make,  I’m using the default configuration of both Kafka and Zookeeper.
> 
>> On 07 Jun 2016, at 11:39, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>> 
>> I assume you zookeeper is up and running
>> 
>> can you confirm that you are getting topics from kafka independently for example on the command line
>> 
>> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --from-beginning --topic newtopic
>> 
>> 
>> 
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>  
>> 
>> On 7 June 2016 at 10:06, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>> As I am trying to integrate Kafka into Spark, the following exception occurs:
>> 
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([*<topicName>*,0])
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at scala.util.Either.fold(Either.scala:97)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>         at .<init>(<console>:11)
>>         at .<clinit>(<console>)
>>         at .<init>(<console>:7)
>>         at .<clinit>(<console>)
>>         at $print(<console>)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>         at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>         at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>         at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>         at
>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> 
>> As for the Spark configuration:
>> 
>>    val conf: SparkConf = new
>> SparkConf().setAppName("AppName").setMaster("local[2]")
>> 
>>     val confParams: Map[String, String] = Map(
>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>       "auto.offset.reset" -> "largest"
>>     )
>> 
>>     val topics: Set[String] = Set("<topic_name>")
>> 
>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>> topics)
>> 
>>     kafkaStream.foreachRDD(rdd => {
>>       rdd.collect().foreach(println)
>>     })
>> 
>>     context.awaitTermination()
>>     context.start()
>> 
>> The Kafka topic does exist, Kafka server is up and running and I am able to
>> produce messages to that particular topic using the Confluent REST API.
>> 
>> What might the problem actually be?
>> 
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com <http://nabble.com/>.
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
>> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
>> 
>> 
> 
> 


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Mich Talebzadeh <mi...@gmail.com>.
ok that is good

Yours is basically simple streaming with Kafka (publishing topic) and your
Spark streaming. use the following as blueprint

// Create a local StreamingContext with two working thread and batch
interval of 2 seconds.
val sparkConf = new SparkConf().
             setAppName("CEP_streaming").
             setMaster("local[2]").
             set("spark.executor.memory", "4G").
             set("spark.cores.max", "2").
             set("spark.streaming.concurrentJobs", "2").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_streaming" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()

val lines = dstream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toFloat)
// window length - The duration of the window below that must be multiple
of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
val windowLength = 4
// sliding interval - The interval at which the window operation is
performed in other words data is collected within this "previous interval'
val slidingInterval = 2  // keep this the same as batch window for
continuous streaming. You are aggregating data that you are collecting over
the  batch Window
val countByValueAndWindow = price.filter(_ >
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()
//
ssc.start()
ssc.awaitTermination()

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 10:58, Dominik Safaric <do...@gmail.com> wrote:

> Dear Mich,
>
> Thank you for the reply.
>
> By running the following command in the command line:
>
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> <topic_name> --from-beginning
>
> I do indeed retrieve all messages of a topic.
>
> Any indication onto what might cause the issue?
>
> An important note to make,  I’m using the default configuration of both
> Kafka and Zookeeper.
>
> On 07 Jun 2016, at 11:39, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> I assume you zookeeper is up and running
>
> can you confirm that you are getting topics from kafka independently for
> example on the command line
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --from-beginning --topic newtopic
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 7 June 2016 at 10:06, Dominik Safaric <do...@gmail.com> wrote:
>
>> As I am trying to integrate Kafka into Spark, the following exception
>> occurs:
>>
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([*<topicName>*,0])
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at scala.util.Either.fold(Either.scala:97)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>         at .<init>(<console>:11)
>>         at .<clinit>(<console>)
>>         at .<init>(<console>:7)
>>         at .<clinit>(<console>)
>>         at $print(<console>)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at
>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>         at
>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>         at
>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>         at
>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>         at
>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>         at
>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>         at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>         at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>         at
>>
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>         at
>>
>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>         at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>
>> As for the Spark configuration:
>>
>>    val conf: SparkConf = new
>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>
>>     val confParams: Map[String, String] = Map(
>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>       "auto.offset.reset" -> "largest"
>>     )
>>
>>     val topics: Set[String] = Set("<topic_name>")
>>
>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>> topics)
>>
>>     kafkaStream.foreachRDD(rdd => {
>>       rdd.collect().foreach(println)
>>     })
>>
>>     context.awaitTermination()
>>     context.start()
>>
>> The Kafka topic does exist, Kafka server is up and running and I am able
>> to
>> produce messages to that particular topic using the Confluent REST API.
>>
>> What might the problem actually be?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com>.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>
>

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Dominik Safaric <do...@gmail.com>.
Dear Mich,

Thank you for the reply.

By running the following command in the command line:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topic_name> --from-beginning

I do indeed retrieve all messages of a topic. 

Any indication onto what might cause the issue? 

An important note to make,  I’m using the default configuration of both Kafka and Zookeeper.

> On 07 Jun 2016, at 11:39, Mich Talebzadeh <mi...@gmail.com> wrote:
> 
> I assume you zookeeper is up and running
> 
> can you confirm that you are getting topics from kafka independently for example on the command line
> 
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --from-beginning --topic newtopic
> 
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 7 June 2016 at 10:06, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> As I am trying to integrate Kafka into Spark, the following exception occurs:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([*<topicName>*,0])
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>         at .<init>(<console>:11)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>         at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>         at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>         at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>         at
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> 
> As for the Spark configuration:
> 
>    val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
> 
>     val confParams: Map[String, String] = Map(
>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>       "auto.offset.reset" -> "largest"
>     )
> 
>     val topics: Set[String] = Set("<topic_name>")
> 
>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
> 
>     kafkaStream.foreachRDD(rdd => {
>       rdd.collect().foreach(println)
>     })
> 
>     context.awaitTermination()
>     context.start()
> 
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
> 
> What might the problem actually be?
> 
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
> 
> 


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

Posted by Mich Talebzadeh <mi...@gmail.com>.
I assume you zookeeper is up and running

can you confirm that you are getting topics from kafka independently for
example on the command line

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--from-beginning --topic newtopic






Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 10:06, Dominik Safaric <do...@gmail.com> wrote:

> As I am trying to integrate Kafka into Spark, the following exception
> occurs:
>
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([*<topicName>*,0])
>         at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
>
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>         at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>         at .<init>(<console>:11)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>         at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>         at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>         at
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>         at
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>         at
>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
>
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>         at
>
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
> As for the Spark configuration:
>
>    val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
>
>     val confParams: Map[String, String] = Map(
>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>       "auto.offset.reset" -> "largest"
>     )
>
>     val topics: Set[String] = Set("<topic_name>")
>
>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
>
>     kafkaStream.foreachRDD(rdd => {
>       rdd.collect().foreach(println)
>     })
>
>     context.awaitTermination()
>     context.start()
>
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
>
> What might the problem actually be?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>