You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2017/02/04 20:33:39 UTC

SSpark streaming: Could not initialize class kafka.consumer.FetchRequestAndResponseStatsRegistry$

I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
1.5).

Admittedly I am messing around with Spark-shell. However, I am surprised
why this does not work with Spark 2 and is ok with CDH 5.1

scala>     val dstream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)

java.lang.NoClassDefFoundError: Could not initialize class
kafka.consumer.FetchRequestAndResponseStatsRegistry$
  at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
  at
org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)
  at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:345)
  at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at org.apache.spark.streaming.kafka.KafkaCluster.org
$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
  at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
  at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
  at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
  at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
  ... 74 elided


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: SSpark streaming: Could not initialize class kafka.consumer.FetchRequestAndResponseStatsRegistry$

Posted by Marco Mistroni <mm...@gmail.com>.
My bad! Confused myself with different build.sbt I tried in different
projects
Thx Cody for pointing that out(again)
Spark streaming Kafka was all I needed
Kr

On 6 Feb 2017 9:02 pm, "Cody Koeninger" <co...@koeninger.org> wrote:

> You should not need to include jars for Kafka, the spark connectors
> have the appropriate transitive dependency on the correct version.
>
> On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni <mm...@gmail.com>
> wrote:
> > Hi
> >  not sure if this will help at all, and pls take it with a pinch of salt
> as
> > i dont have your setup and i am not running on a cluster
> >
> >  I have tried to run a kafka example which was originally workkign on
> spark
> > 1.6.1 on spark 2.
> > These are the jars i am using
> >
> > spark-streaming-kafka-0-10_2.11_2.0.1.jar
> >
> > kafka_2.11-0.10.1.1
> >
> >
> > And here's the code up to the creation of the Direct Stream. apparently
> with
> > the new version of kafka libs some properties have to be specified
> >
> >
> > import org.apache.spark.SparkConf
> > import org.apache.spark.streaming.{Seconds, StreamingContext}
> > import org.apache.spark.storage.StorageLevel
> >
> > import java.util.regex.Pattern
> > import java.util.regex.Matcher
> >
> > import Utilities._
> >
> > import org.apache.spark.streaming.kafka010.KafkaUtils
> > import kafka.serializer.StringDecoder
> > import
> > org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> > import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
> >
> > /** Working example of listening for log data from Kafka's testLogs
> topic on
> > port 9092. */
> > object KafkaExample {
> >
> >   def main(args: Array[String]) {
> >
> >     // Create the context with a 1 second batch size
> >     val ssc = new StreamingContext("local[*]", "KafkaExample",
> Seconds(1))
> >
> >     setupLogging()
> >
> >     // Construct a regular expression (regex) to extract fields from raw
> > Apache log lines
> >     val pattern = apacheLogPattern()
> >
> >     val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
> > "bootstrap.servers" -> "localhost:9092",
> >             "key.deserializer"
> > ->"org.apache.kafka.common.serialization.StringDeserializer",
> >             "value.deserializer"
> > ->"org.apache.kafka.common.serialization.StringDeserializer",
> >             "group.id" -> "group1")
> >     val topics = List("testLogs").toSet
> >     val lines = KafkaUtils.createDirectStream[String, String](
> >                                             ssc,
> >                                             PreferConsistent,
> >                                             Subscribe[String,
> > String](topics, kafkaParams)
> >                                           ).map(cr => cr.value())
> >
> > hth
> >
> >  marco
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <
> mich.talebzadeh@gmail.com>
> > wrote:
> >>
> >> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
> >> 1.5).
> >>
> >> Admittedly I am messing around with Spark-shell. However, I am surprised
> >> why this does not work with Spark 2 and is ok with CDH 5.1
> >>
> >> scala>     val dstream = KafkaUtils.createDirectStream[String, String,
> >> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
> >>
> >> java.lang.NoClassDefFoundError: Could not initialize class
> >> kafka.consumer.FetchRequestAndResponseStatsRegistry$
> >>   at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.connect(
> KafkaCluster.scala:52)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:345)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:342)
> >>   at
> >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
> scala:33)
> >>   at scala.collection.mutable.WrappedArray.foreach(
> WrappedArray.scala:35)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.org$apache$
> spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(
> KafkaCluster.scala:125)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.
> getPartitions(KafkaCluster.scala:112)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.
> scala:211)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
> >>   ... 74 elided
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn
> >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> Disclaimer: Use it at your own risk. Any and all responsibility for any
> >> loss, damage or destruction of data or any other property which may
> arise
> >> from relying on this email's technical content is explicitly
> disclaimed. The
> >> author will in no case be liable for any monetary damages arising from
> such
> >> loss, damage or destruction.
> >>
> >>
> >
> >
>

Re: SSpark streaming: Could not initialize class kafka.consumer.FetchRequestAndResponseStatsRegistry$

Posted by Cody Koeninger <co...@koeninger.org>.
You should not need to include jars for Kafka, the spark connectors
have the appropriate transitive dependency on the correct version.

On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni <mm...@gmail.com> wrote:
> Hi
>  not sure if this will help at all, and pls take it with a pinch of salt as
> i dont have your setup and i am not running on a cluster
>
>  I have tried to run a kafka example which was originally workkign on spark
> 1.6.1 on spark 2.
> These are the jars i am using
>
> spark-streaming-kafka-0-10_2.11_2.0.1.jar
>
> kafka_2.11-0.10.1.1
>
>
> And here's the code up to the creation of the Direct Stream. apparently with
> the new version of kafka libs some properties have to be specified
>
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.storage.StorageLevel
>
> import java.util.regex.Pattern
> import java.util.regex.Matcher
>
> import Utilities._
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import kafka.serializer.StringDecoder
> import
> org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
> /** Working example of listening for log data from Kafka's testLogs topic on
> port 9092. */
> object KafkaExample {
>
>   def main(args: Array[String]) {
>
>     // Create the context with a 1 second batch size
>     val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))
>
>     setupLogging()
>
>     // Construct a regular expression (regex) to extract fields from raw
> Apache log lines
>     val pattern = apacheLogPattern()
>
>     val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
> "bootstrap.servers" -> "localhost:9092",
>             "key.deserializer"
> ->"org.apache.kafka.common.serialization.StringDeserializer",
>             "value.deserializer"
> ->"org.apache.kafka.common.serialization.StringDeserializer",
>             "group.id" -> "group1")
>     val topics = List("testLogs").toSet
>     val lines = KafkaUtils.createDirectStream[String, String](
>                                             ssc,
>                                             PreferConsistent,
>                                             Subscribe[String,
> String](topics, kafkaParams)
>                                           ).map(cr => cr.value())
>
> hth
>
>  marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>>
>> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
>> 1.5).
>>
>> Admittedly I am messing around with Spark-shell. However, I am surprised
>> why this does not work with Spark 2 and is ok with CDH 5.1
>>
>> scala>     val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$
>>   at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:345)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>   at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>   at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
>>   at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>   ... 74 elided
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for any
>> loss, damage or destruction of data or any other property which may arise
>> from relying on this email's technical content is explicitly disclaimed. The
>> author will in no case be liable for any monetary damages arising from such
>> loss, damage or destruction.
>>
>>
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: SSpark streaming: Could not initialize class kafka.consumer.FetchRequestAndResponseStatsRegistry$

Posted by Marco Mistroni <mm...@gmail.com>.
Hi
 not sure if this will help at all, and pls take it with a pinch of salt as
i dont have your setup and i am not running on a cluster

 I have tried to run a kafka example which was originally workkign on spark
1.6.1 on spark 2.
These are the jars i am using

spark-streaming-kafka-0-10_2.11_2.0.1.jar
kafka_2.11-0.10.1.1


And here's the code up to the creation of the Direct Stream. apparently
with the new version of kafka libs some properties have to be specified


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import java.util.regex.Pattern
import java.util.regex.Matcher

import Utilities._

import org.apache.spark.streaming.kafka010.KafkaUtils
import kafka.serializer.StringDecoder
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/** Working example of listening for log data from Kafka's testLogs topic
on port 9092. */
object KafkaExample {

  def main(args: Array[String]) {

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

    setupLogging()

    // Construct a regular expression (regex) to extract fields from raw
Apache log lines
    val pattern = apacheLogPattern()

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
"bootstrap.servers" -> "localhost:9092",
            "key.deserializer"
->"org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer"
->"org.apache.kafka.common.serialization.StringDeserializer",
            "group.id" -> "group1")
    val topics = List("testLogs").toSet
    val lines = KafkaUtils.createDirectStream[String, String](
                                            ssc,
                                            PreferConsistent,
                                            Subscribe[String,
String](topics, kafkaParams)
                                          ).map(cr => cr.value())

hth

 marco












On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <mi...@gmail.com>
wrote:

> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
> 1.5).
>
> Admittedly I am messing around with Spark-shell. However, I am surprised
> why this does not work with Spark 2 and is ok with CDH 5.1
>
> scala>     val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>
> java.lang.NoClassDefFoundError: Could not initialize class kafka.consumer.
> FetchRequestAndResponseStatsRegistry$
>   at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
>   at org.apache.spark.streaming.kafka.KafkaCluster.connect(
> KafkaCluster.scala:52)
>   at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:345)
>   at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:342)
>   at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$
> spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>   at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(
> KafkaCluster.scala:125)
>   at org.apache.spark.streaming.kafka.KafkaCluster.
> getPartitions(KafkaCluster.scala:112)
>   at org.apache.spark.streaming.kafka.KafkaUtils$.
> getFromOffsets(KafkaUtils.scala:211)
>   at org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
>   ... 74 elided
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>