You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mario Pastorelli <ma...@teralytics.ch> on 2014/12/11 13:52:43 UTC

Spark streaming: missing classes when kafka consumer classes

Hi,

I'm trying to use spark-streaming with kafka but I get a strange error 
on class that are missing. I would like to ask if my way to build the 
fat jar is correct or no. My program is

val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum, 
kafkaGroupId, kafkaTopicsWithThreads)
                             .map(_._2)

kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition { 
iter:Iterator[CellWithLAC] =>
   println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString)
})

I use sbt to manage my project and my build.sbt (with assembly 0.12.0 
plugin) is

name := "spark_example"

version := "0.0.1"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-deprecation","-feature")

libraryDependencies ++= Seq(
   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
   "joda-time" % "joda-time" % "2.6"
)

assemblyMergeStrategy in assembly := {
   case p if p startsWith "com/esotericsoftware/minlog" => 
MergeStrategy.first
   case p if p startsWith "org/apache/commons/beanutils" => 
MergeStrategy.first
   case p if p startsWith "org/apache/" => MergeStrategy.last
   case "plugin.properties" => MergeStrategy.discard
   case p if p startsWith "META-INF" => MergeStrategy.discard
   case x =>
     val oldStrategy = (assemblyMergeStrategy in assembly).value
     oldStrategy(x)
}

I create the jar with sbt assembly and the run with 
$SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main 
target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181 
test-consumer-group test1

where master:7077 is the spark master, localhost:2181 is zookeeper, 
test-consumer-group is kafka groupid and test1 is the kafka topic. The 
program starts and keep running but I get an error and nothing is 
printed. In the log I found the following stack trace:

14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection 
from [10.0.3.1/10.0.3.1:54325]
14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection 
to [jpl-devvax/127.0.1.1:38767]
14/12/11 13:02:08 INFO network.SendingConnection: Connected to 
[jpl-devvax/127.0.1.1:38767], 1 messages pending
14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added 
broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B, free: 
265.4 MB)
14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver 
for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver 
for stream 0: Error starting receiver 0 - 
java.lang.NoClassDefFoundError: 
kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
     at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown 
Source)
     at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown 
Source)
     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
     at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown 
Source)
     at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown 
Source)
     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
     at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown 
Source)
     at 
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
     at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
     at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
     at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
     at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
     at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
     at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
     at org.apache.spark.scheduler.Task.run(Task.scala:54)
     at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:745)

I have searched inside the fat jar and I found that that class is not in 
it:

 > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep 
"kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
 >

The problem is the double dollar before anonfun: if you put only one 
then the class is there:

 > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep 
"kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
[...]
kafka/consumer/ZookeeperConsumerConnector.class
 >

I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded 
from the spark website.

My question is: how can I solve this problem? I guess the problem is my 
sbt script but I don't understand why.


Thanks,
Mario Pastorelli


Re: Spark streaming: missing classes when kafka consumer classes

Posted by Mario Pastorelli <ma...@teralytics.ch>.
Hi,

I asked on SO and got an answer about this 
http://stackoverflow.com/questions/27444512/missing-classes-from-the-assembly-file-created-by-sbt-assembly 
. Adding fullClasspath in assembly := (fullClasspath in Compile).value
  at the end of my builld.sbt solved the problem, apparently.

Best,
Mario

On 11.12.2014 20:04, Flávio Santos wrote:
> Hi Mario,
>
> Try to include this to your libraryDependencies (in your sbt file):
>
>   "org.apache.kafka" % "kafka_2.10" % "0.8.0"
>     exclude("javax.jms", "jms")
>     exclude("com.sun.jdmk", "jmxtools")
>     exclude("com.sun.jmx", "jmxri")
>     exclude("org.slf4j", "slf4j-simple")
>
> Regards,
>
> *--
> Flávio R. Santos*
>
> Chaordic | /Platform/
> _www.chaordic.com.br <http://www.chaordic.com.br/>_
> +55 48 3232.3200
>
> On Thu, Dec 11, 2014 at 12:32 PM, Mario Pastorelli 
> <mario.pastorelli@teralytics.ch 
> <ma...@teralytics.ch>> wrote:
>
>     Thanks akhil for the answer.
>
>     I am using sbt assembly and the build.sbt is in the first email.
>     Do you know why those classes are included in that way?
>
>
>     Thanks,
>     Mario
>
>
>     On 11.12.2014 14:51, Akhil Das wrote:
>>     Yes. You can do/use *sbt assembly* and create a big fat jar with
>>     all dependencies bundled inside it.
>>
>>     Thanks
>>     Best Regards
>>
>>     On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli
>>     <mario.pastorelli@teralytics.ch
>>     <ma...@teralytics.ch>> wrote:
>>
>>         In this way it works but it's not portable and the idea of
>>         having a fat jar is to avoid exactly this. Is there any
>>         system to create a self-contained portable fatJar?
>>
>>
>>         On 11.12.2014 13:57, Akhil Das wrote:
>>>         Add these jars while creating the Context.
>>>
>>>                val sc = new SparkContext(conf)
>>>
>>>         sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*")
>>>         sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*")
>>>         sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*")
>>>         sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*")
>>>                 val ssc = new StreamingContext(sc, Seconds(10))
>>>
>>>
>>>         Thanks
>>>         Best Regards
>>>
>>>         On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli
>>>         <mario.pastorelli@teralytics.ch
>>>         <ma...@teralytics.ch>> wrote:
>>>
>>>             Hi,
>>>
>>>             I'm trying to use spark-streaming with kafka but I get a
>>>             strange error on class that are missing. I would like to
>>>             ask if my way to build the fat jar is correct or no. My
>>>             program is
>>>
>>>             val kafkaStream = KafkaUtils.createStream(ssc,
>>>             zookeeperQuorum, kafkaGroupId, kafkaTopicsWithThreads)
>>>             .map(_._2)
>>>
>>>             kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>>             iter:Iterator[CellWithLAC] =>
>>>               println("time: " ++ t.toString ++ " #received: " ++
>>>             iter.size.toString)
>>>             })
>>>
>>>             I use sbt to manage my project and my build.sbt (with
>>>             assembly 0.12.0 plugin) is
>>>
>>>             name := "spark_example"
>>>
>>>             version := "0.0.1"
>>>
>>>             scalaVersion := "2.10.4"
>>>
>>>             scalacOptions ++= Seq("-deprecation","-feature")
>>>
>>>             libraryDependencies ++= Seq(
>>>               "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>>               "org.apache.spark" % "spark-streaming-kafka_2.10" %
>>>             "1.1.1",
>>>               "joda-time" % "joda-time" % "2.6"
>>>             )
>>>
>>>             assemblyMergeStrategy in assembly := {
>>>               case p if p startsWith "com/esotericsoftware/minlog"
>>>             => MergeStrategy.first
>>>               case p if p startsWith "org/apache/commons/beanutils"
>>>             => MergeStrategy.first
>>>               case p if p startsWith "org/apache/" =>
>>>             MergeStrategy.last
>>>               case "plugin.properties" => MergeStrategy.discard
>>>               case p if p startsWith "META-INF" =>
>>>             MergeStrategy.discard
>>>               case x =>
>>>                 val oldStrategy = (assemblyMergeStrategy in
>>>             assembly).value
>>>                 oldStrategy(x)
>>>             }
>>>
>>>             I create the jar with sbt assembly and the run with
>>>             $SPARK_HOME/bin/spark-submit --master
>>>             spark://master:7077 --class Main
>>>             target/scala-2.10/spark_example-assembly-0.0.1.jar
>>>             localhost:2181 test-consumer-group test1
>>>
>>>             where master:7077 is the spark master, localhost:2181 is
>>>             zookeeper, test-consumer-group is kafka groupid and
>>>             test1 is the kafka topic. The program starts and keep
>>>             running but I get an error and nothing is printed. In
>>>             the log I found the following stack trace:
>>>
>>>             14/12/11 13:02:08 INFO network.ConnectionManager:
>>>             Accepted connection from [10.0.3.1/10.0.3.1:54325
>>>             <http://10.0.3.1/10.0.3.1:54325>]
>>>             14/12/11 13:02:08 INFO network.SendingConnection:
>>>             Initiating connection to [jpl-devvax/127.0.1.1:38767
>>>             <http://127.0.1.1:38767>]
>>>             14/12/11 13:02:08 INFO network.SendingConnection:
>>>             Connected to [jpl-devvax/127.0.1.1:38767
>>>             <http://127.0.1.1:38767>], 1 messages pending
>>>             14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>>>             broadcast_2_piece0 in memory on jpl-devvax:38767 (size:
>>>             842.0 B, free: 265.4 MB)
>>>             14/12/11 13:02:08 INFO scheduler.ReceiverTracker:
>>>             Registered receiver for stream 0 from
>>>             akka.tcp://sparkExecutor@jpl-devvax:46602
>>>             14/12/11 13:02:08 ERROR scheduler.ReceiverTracker:
>>>             Deregistered receiver for stream 0: Error starting
>>>             receiver 0 - java.lang.NoClassDefFoundError:
>>>             kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>>>             Source)
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>>>             Source)
>>>                 at
>>>             scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>>
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>>>             Source)
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>>>             Source)
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>>>
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>>>             Source)
>>>                 at
>>>             org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>>                 at
>>>             org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>                 at
>>>             org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>                 at
>>>             org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>                 at
>>>             org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>                 at
>>>             org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>                 at
>>>             org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>                 at
>>>             org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>
>>>                 at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>                 at
>>>             org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>                 at java.lang.Thread.run(Thread.java:745)
>>>
>>>             I have searched inside the fat jar and I found that that
>>>             class is not in it:
>>>
>>>             > jar -tf
>>>             target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar |
>>>             grep
>>>             "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>>             >
>>>
>>>             The problem is the double dollar before anonfun: if you
>>>             put only one then the class is there:
>>>
>>>             > jar -tf
>>>             target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar |
>>>             grep
>>>             "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>>             [...]
>>>             kafka/consumer/ZookeeperConsumerConnector.class
>>>             >
>>>
>>>             I'm submitting my job to spark-1.1.1 compiled with
>>>             hadoop2.4 downloaded from the spark website.
>>>
>>>             My question is: how can I solve this problem? I guess
>>>             the problem is my sbt script but I don't understand why.
>>>
>>>
>>>             Thanks,
>>>             Mario Pastorelli
>>>
>>>
>>
>>
>
>


Re: Spark streaming: missing classes when kafka consumer classes

Posted by Flávio Santos <ba...@chaordicsystems.com>.
Hi Mario,

Try to include this to your libraryDependencies (in your sbt file):

  "org.apache.kafka" % "kafka_2.10" % "0.8.0"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri")
    exclude("org.slf4j", "slf4j-simple")

Regards,


*--Flávio R. Santos*

Chaordic | *Platform*
*www.chaordic.com.br <http://www.chaordic.com.br/>*
+55 48 3232.3200

On Thu, Dec 11, 2014 at 12:32 PM, Mario Pastorelli <
mario.pastorelli@teralytics.ch> wrote:

>  Thanks akhil for the answer.
>
> I am using sbt assembly and the build.sbt is in the first email. Do you
> know why those classes are included in that way?
>
>
> Thanks,
> Mario
>
>
> On 11.12.2014 14:51, Akhil Das wrote:
>
>  Yes. You can do/use *sbt assembly* and create a big fat jar with all
> dependencies bundled inside it.
>
>  Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli <
> mario.pastorelli@teralytics.ch> wrote:
>
>>  In this way it works but it's not portable and the idea of having a fat
>> jar is to avoid exactly this. Is there any system to create a
>> self-contained portable fatJar?
>>
>>
>> On 11.12.2014 13:57, Akhil Das wrote:
>>
>>  Add these jars while creating the Context.
>>
>>         val sc = new SparkContext(conf)
>>
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
>> *spark-streaming-kafka_2.10-1.1.0.jar*")
>>         sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
>> *zkclient-0.3.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
>> *metrics-core-2.2.0.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
>> *kafka_2.10-0.8.0.jar*")
>>
>>         val ssc = new StreamingContext(sc, Seconds(10))
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
>> mario.pastorelli@teralytics.ch> wrote:
>>
>>>  Hi,
>>>
>>> I'm trying to use spark-streaming with kafka but I get a strange error
>>> on class that are missing. I would like to ask if my way to build the fat
>>> jar is correct or no. My program is
>>>
>>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>>> kafkaGroupId, kafkaTopicsWithThreads)
>>>                             .map(_._2)
>>>
>>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>> iter:Iterator[CellWithLAC] =>
>>>   println("time: " ++ t.toString ++ " #received: " ++
>>> iter.size.toString)
>>> })
>>>
>>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
>>> plugin) is
>>>
>>> name := "spark_example"
>>>
>>> version := "0.0.1"
>>>
>>> scalaVersion := "2.10.4"
>>>
>>> scalacOptions ++= Seq("-deprecation","-feature")
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>>   "joda-time" % "joda-time" % "2.6"
>>> )
>>>
>>> assemblyMergeStrategy in assembly := {
>>>   case p if p startsWith "com/esotericsoftware/minlog" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/commons/beanutils" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/" => MergeStrategy.last
>>>   case "plugin.properties" => MergeStrategy.discard
>>>   case p if p startsWith "META-INF" => MergeStrategy.discard
>>>   case x =>
>>>     val oldStrategy = (assemblyMergeStrategy in assembly).value
>>>     oldStrategy(x)
>>> }
>>>
>>> I create the jar with sbt assembly and the run with
>>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
>>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
>>> test-consumer-group test1
>>>
>>> where master:7077 is the spark master, localhost:2181 is zookeeper,
>>> test-consumer-group is kafka groupid and test1 is the kafka topic. The
>>> program starts and keep running but I get an error and nothing is printed.
>>> In the log I found the following stack trace:
>>>
>>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection
>>> from [10.0.3.1/10.0.3.1:54325]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection
>>> to [jpl-devvax/127.0.1.1:38767]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to
>>> [jpl-devvax/127.0.1.1:38767], 1 messages pending
>>> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>>> broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B, free:
>>> 265.4 MB)
>>> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver
>>> for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
>>> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
>>> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>>> Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>>> Source)
>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>>> Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>>> Source)
>>>     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>>> Source)
>>>     at
>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>>     at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>     at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>     at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>     at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>     at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> I have searched inside the fat jar and I found that that class is not in
>>> it:
>>>
>>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>> >
>>>
>>> The problem is the double dollar before anonfun: if you put only one
>>> then the class is there:
>>>
>>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>> [...]
>>> kafka/consumer/ZookeeperConsumerConnector.class
>>> >
>>>
>>> I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded
>>> from the spark website.
>>>
>>> My question is: how can I solve this problem? I guess the problem is my
>>> sbt script but I don't understand why.
>>>
>>>
>>> Thanks,
>>> Mario Pastorelli
>>>
>>>
>>
>>
>
>

Re: Spark streaming: missing classes when kafka consumer classes

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Last time i did an sbt assembly and this is how i added the dependencies.


libraryDependencies ++= Seq(
  ("org.apache.spark" % "spark-streaming_2.10" % "1.1.0" % "provided").
    exclude("org.eclipse.jetty.orbit", "javax.transaction").
    exclude("org.eclipse.jetty.orbit", "javax.mail").
    exclude("org.eclipse.jetty.orbit", "javax.activation").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-logging", "commons-logging").
    exclude("commons-collections", "commons-collections").
    exclude("org.eclipse.jetty.orbit", "javax.servlet")
)

libraryDependencies ++= Seq(
  ("org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0").
    exclude("org.eclipse.jetty.orbit", "javax.transaction").
    exclude("org.eclipse.jetty.orbit", "javax.mail").
    exclude("org.eclipse.jetty.orbit", "javax.activation").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-logging", "commons-logging").
    exclude("commons-collections", "commons-collections").
    exclude("org.eclipse.jetty.orbit", "javax.servlet")
)


Those excluded were causing conflicts.

Thanks
Best Regards

On Thu, Dec 11, 2014 at 8:02 PM, Mario Pastorelli <
mario.pastorelli@teralytics.ch> wrote:

>  Thanks akhil for the answer.
>
> I am using sbt assembly and the build.sbt is in the first email. Do you
> know why those classes are included in that way?
>
>
> Thanks,
> Mario
>
>
> On 11.12.2014 14:51, Akhil Das wrote:
>
>  Yes. You can do/use *sbt assembly* and create a big fat jar with all
> dependencies bundled inside it.
>
>  Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli <
> mario.pastorelli@teralytics.ch> wrote:
>
>>  In this way it works but it's not portable and the idea of having a fat
>> jar is to avoid exactly this. Is there any system to create a
>> self-contained portable fatJar?
>>
>>
>> On 11.12.2014 13:57, Akhil Das wrote:
>>
>>  Add these jars while creating the Context.
>>
>>         val sc = new SparkContext(conf)
>>
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
>> *spark-streaming-kafka_2.10-1.1.0.jar*")
>>         sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
>> *zkclient-0.3.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
>> *metrics-core-2.2.0.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
>> *kafka_2.10-0.8.0.jar*")
>>
>>         val ssc = new StreamingContext(sc, Seconds(10))
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
>> mario.pastorelli@teralytics.ch> wrote:
>>
>>>  Hi,
>>>
>>> I'm trying to use spark-streaming with kafka but I get a strange error
>>> on class that are missing. I would like to ask if my way to build the fat
>>> jar is correct or no. My program is
>>>
>>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>>> kafkaGroupId, kafkaTopicsWithThreads)
>>>                             .map(_._2)
>>>
>>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>> iter:Iterator[CellWithLAC] =>
>>>   println("time: " ++ t.toString ++ " #received: " ++
>>> iter.size.toString)
>>> })
>>>
>>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
>>> plugin) is
>>>
>>> name := "spark_example"
>>>
>>> version := "0.0.1"
>>>
>>> scalaVersion := "2.10.4"
>>>
>>> scalacOptions ++= Seq("-deprecation","-feature")
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>>   "joda-time" % "joda-time" % "2.6"
>>> )
>>>
>>> assemblyMergeStrategy in assembly := {
>>>   case p if p startsWith "com/esotericsoftware/minlog" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/commons/beanutils" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/" => MergeStrategy.last
>>>   case "plugin.properties" => MergeStrategy.discard
>>>   case p if p startsWith "META-INF" => MergeStrategy.discard
>>>   case x =>
>>>     val oldStrategy = (assemblyMergeStrategy in assembly).value
>>>     oldStrategy(x)
>>> }
>>>
>>> I create the jar with sbt assembly and the run with
>>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
>>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
>>> test-consumer-group test1
>>>
>>> where master:7077 is the spark master, localhost:2181 is zookeeper,
>>> test-consumer-group is kafka groupid and test1 is the kafka topic. The
>>> program starts and keep running but I get an error and nothing is printed.
>>> In the log I found the following stack trace:
>>>
>>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection
>>> from [10.0.3.1/10.0.3.1:54325]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection
>>> to [jpl-devvax/127.0.1.1:38767]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to
>>> [jpl-devvax/127.0.1.1:38767], 1 messages pending
>>> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>>> broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B, free:
>>> 265.4 MB)
>>> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver
>>> for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
>>> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
>>> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>>> Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>>> Source)
>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>>> Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>>> Source)
>>>     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>>> Source)
>>>     at
>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>>     at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>     at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>     at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>     at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>     at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> I have searched inside the fat jar and I found that that class is not in
>>> it:
>>>
>>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>> >
>>>
>>> The problem is the double dollar before anonfun: if you put only one
>>> then the class is there:
>>>
>>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>> [...]
>>> kafka/consumer/ZookeeperConsumerConnector.class
>>> >
>>>
>>> I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded
>>> from the spark website.
>>>
>>> My question is: how can I solve this problem? I guess the problem is my
>>> sbt script but I don't understand why.
>>>
>>>
>>> Thanks,
>>> Mario Pastorelli
>>>
>>>
>>
>>
>
>

Re: Spark streaming: missing classes when kafka consumer classes

Posted by Mario Pastorelli <ma...@teralytics.ch>.
Thanks akhil for the answer.

I am using sbt assembly and the build.sbt is in the first email. Do you 
know why those classes are included in that way?


Thanks,
Mario

On 11.12.2014 14:51, Akhil Das wrote:
> Yes. You can do/use *sbt assembly* and create a big fat jar with all 
> dependencies bundled inside it.
>
> Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli 
> <mario.pastorelli@teralytics.ch 
> <ma...@teralytics.ch>> wrote:
>
>     In this way it works but it's not portable and the idea of having
>     a fat jar is to avoid exactly this. Is there any system to create
>     a self-contained portable fatJar?
>
>
>     On 11.12.2014 13:57, Akhil Das wrote:
>>     Add these jars while creating the Context.
>>
>>            val sc = new SparkContext(conf)
>>
>>     sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*")
>>     sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*")
>>     sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*")
>>     sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*")
>>             val ssc = new StreamingContext(sc, Seconds(10))
>>
>>
>>     Thanks
>>     Best Regards
>>
>>     On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli
>>     <mario.pastorelli@teralytics.ch
>>     <ma...@teralytics.ch>> wrote:
>>
>>         Hi,
>>
>>         I'm trying to use spark-streaming with kafka but I get a
>>         strange error on class that are missing. I would like to ask
>>         if my way to build the fat jar is correct or no. My program is
>>
>>         val kafkaStream = KafkaUtils.createStream(ssc,
>>         zookeeperQuorum, kafkaGroupId, kafkaTopicsWithThreads)
>>                                     .map(_._2)
>>
>>         kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>         iter:Iterator[CellWithLAC] =>
>>           println("time: " ++ t.toString ++ " #received: " ++
>>         iter.size.toString)
>>         })
>>
>>         I use sbt to manage my project and my build.sbt (with
>>         assembly 0.12.0 plugin) is
>>
>>         name := "spark_example"
>>
>>         version := "0.0.1"
>>
>>         scalaVersion := "2.10.4"
>>
>>         scalacOptions ++= Seq("-deprecation","-feature")
>>
>>         libraryDependencies ++= Seq(
>>           "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>           "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>           "joda-time" % "joda-time" % "2.6"
>>         )
>>
>>         assemblyMergeStrategy in assembly := {
>>           case p if p startsWith "com/esotericsoftware/minlog" =>
>>         MergeStrategy.first
>>           case p if p startsWith "org/apache/commons/beanutils" =>
>>         MergeStrategy.first
>>           case p if p startsWith "org/apache/" => MergeStrategy.last
>>           case "plugin.properties" => MergeStrategy.discard
>>           case p if p startsWith "META-INF" => MergeStrategy.discard
>>           case x =>
>>             val oldStrategy = (assemblyMergeStrategy in assembly).value
>>             oldStrategy(x)
>>         }
>>
>>         I create the jar with sbt assembly and the run with
>>         $SPARK_HOME/bin/spark-submit --master spark://master:7077
>>         --class Main
>>         target/scala-2.10/spark_example-assembly-0.0.1.jar
>>         localhost:2181 test-consumer-group test1
>>
>>         where master:7077 is the spark master, localhost:2181 is
>>         zookeeper, test-consumer-group is kafka groupid and test1 is
>>         the kafka topic. The program starts and keep running but I
>>         get an error and nothing is printed. In the log I found the
>>         following stack trace:
>>
>>         14/12/11 13:02:08 INFO network.ConnectionManager: Accepted
>>         connection from [10.0.3.1/10.0.3.1:54325
>>         <http://10.0.3.1/10.0.3.1:54325>]
>>         14/12/11 13:02:08 INFO network.SendingConnection: Initiating
>>         connection to [jpl-devvax/127.0.1.1:38767
>>         <http://127.0.1.1:38767>]
>>         14/12/11 13:02:08 INFO network.SendingConnection: Connected
>>         to [jpl-devvax/127.0.1.1:38767 <http://127.0.1.1:38767>], 1
>>         messages pending
>>         14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>>         broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0
>>         B, free: 265.4 MB)
>>         14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered
>>         receiver for stream 0 from
>>         akka.tcp://sparkExecutor@jpl-devvax:46602
>>         14/12/11 13:02:08 ERROR scheduler.ReceiverTracker:
>>         Deregistered receiver for stream 0: Error starting receiver 0
>>         - java.lang.NoClassDefFoundError:
>>         kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>             at
>>         kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>>         Source)
>>             at
>>         kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>>         Source)
>>             at
>>         scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>             at
>>         kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>>         Source)
>>             at
>>         kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>>         Source)
>>             at
>>         kafka.consumer.ZookeeperConsumerConnector.consume(Unknown
>>         Source)
>>             at
>>         kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>>         Source)
>>             at
>>         org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>             at
>>         org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>             at
>>         org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>             at
>>         org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>             at
>>         org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>             at
>>         org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>             at
>>         org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>             at
>>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>
>>             at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>             at
>>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>
>>             at
>>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>             at
>>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>             at java.lang.Thread.run(Thread.java:745)
>>
>>         I have searched inside the fat jar and I found that that
>>         class is not in it:
>>
>>         > jar -tf
>>         target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep
>>         "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>         >
>>
>>         The problem is the double dollar before anonfun: if you put
>>         only one then the class is there:
>>
>>         > jar -tf
>>         target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep
>>         "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>         [...]
>>         kafka/consumer/ZookeeperConsumerConnector.class
>>         >
>>
>>         I'm submitting my job to spark-1.1.1 compiled with hadoop2.4
>>         downloaded from the spark website.
>>
>>         My question is: how can I solve this problem? I guess the
>>         problem is my sbt script but I don't understand why.
>>
>>
>>         Thanks,
>>         Mario Pastorelli
>>
>>
>
>


Re: Spark streaming: missing classes when kafka consumer classes

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Yes. You can do/use *sbt assembly* and create a big fat jar with all
dependencies bundled inside it.

Thanks
Best Regards

On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli <
mario.pastorelli@teralytics.ch> wrote:

>  In this way it works but it's not portable and the idea of having a fat
> jar is to avoid exactly this. Is there any system to create a
> self-contained portable fatJar?
>
>
> On 11.12.2014 13:57, Akhil Das wrote:
>
>  Add these jars while creating the Context.
>
>         val sc = new SparkContext(conf)
>
>
> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
> *spark-streaming-kafka_2.10-1.1.0.jar*")
>         sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
> *zkclient-0.3.jar*")
>
> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
> *metrics-core-2.2.0.jar*")
>
> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
> *kafka_2.10-0.8.0.jar*")
>
>         val ssc = new StreamingContext(sc, Seconds(10))
>
>
>  Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
> mario.pastorelli@teralytics.ch> wrote:
>
>>  Hi,
>>
>> I'm trying to use spark-streaming with kafka but I get a strange error on
>> class that are missing. I would like to ask if my way to build the fat jar
>> is correct or no. My program is
>>
>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>> kafkaGroupId, kafkaTopicsWithThreads)
>>                             .map(_._2)
>>
>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>> iter:Iterator[CellWithLAC] =>
>>   println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString)
>> })
>>
>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
>> plugin) is
>>
>> name := "spark_example"
>>
>> version := "0.0.1"
>>
>> scalaVersion := "2.10.4"
>>
>> scalacOptions ++= Seq("-deprecation","-feature")
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>   "joda-time" % "joda-time" % "2.6"
>> )
>>
>> assemblyMergeStrategy in assembly := {
>>   case p if p startsWith "com/esotericsoftware/minlog" =>
>> MergeStrategy.first
>>   case p if p startsWith "org/apache/commons/beanutils" =>
>> MergeStrategy.first
>>   case p if p startsWith "org/apache/" => MergeStrategy.last
>>   case "plugin.properties" => MergeStrategy.discard
>>   case p if p startsWith "META-INF" => MergeStrategy.discard
>>   case x =>
>>     val oldStrategy = (assemblyMergeStrategy in assembly).value
>>     oldStrategy(x)
>> }
>>
>> I create the jar with sbt assembly and the run with
>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
>> test-consumer-group test1
>>
>> where master:7077 is the spark master, localhost:2181 is zookeeper,
>> test-consumer-group is kafka groupid and test1 is the kafka topic. The
>> program starts and keep running but I get an error and nothing is printed.
>> In the log I found the following stack trace:
>>
>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection
>> from [10.0.3.1/10.0.3.1:54325]
>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection
>> to [jpl-devvax/127.0.1.1:38767]
>> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to
>> [jpl-devvax/127.0.1.1:38767], 1 messages pending
>> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
>> in memory on jpl-devvax:38767 (size: 842.0 B, free: 265.4 MB)
>> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver for
>> stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
>> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
>> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>     at
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>> Source)
>>     at
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>> Source)
>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>     at
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>> Source)
>>     at
>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>> Source)
>>     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>>     at
>> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>> Source)
>>     at
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>     at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>     at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>     at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>     at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>     at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>     at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> I have searched inside the fat jar and I found that that class is not in
>> it:
>>
>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>> >
>>
>> The problem is the double dollar before anonfun: if you put only one then
>> the class is there:
>>
>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>> [...]
>> kafka/consumer/ZookeeperConsumerConnector.class
>> >
>>
>> I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded
>> from the spark website.
>>
>> My question is: how can I solve this problem? I guess the problem is my
>> sbt script but I don't understand why.
>>
>>
>> Thanks,
>> Mario Pastorelli
>>
>>
>
>

Re: Spark streaming: missing classes when kafka consumer classes

Posted by Mario Pastorelli <ma...@teralytics.ch>.
In this way it works but it's not portable and the idea of having a fat 
jar is to avoid exactly this. Is there any system to create a 
self-contained portable fatJar?

On 11.12.2014 13:57, Akhil Das wrote:
> Add these jars while creating the Context.
>
>        val sc = new SparkContext(conf)
>
> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*")
> sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*")
> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*")
> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*")
>         val ssc = new StreamingContext(sc, Seconds(10))
>
>
> Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli 
> <mario.pastorelli@teralytics.ch 
> <ma...@teralytics.ch>> wrote:
>
>     Hi,
>
>     I'm trying to use spark-streaming with kafka but I get a strange
>     error on class that are missing. I would like to ask if my way to
>     build the fat jar is correct or no. My program is
>
>     val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>     kafkaGroupId, kafkaTopicsWithThreads)
>                                 .map(_._2)
>
>     kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>     iter:Iterator[CellWithLAC] =>
>       println("time: " ++ t.toString ++ " #received: " ++
>     iter.size.toString)
>     })
>
>     I use sbt to manage my project and my build.sbt (with assembly
>     0.12.0 plugin) is
>
>     name := "spark_example"
>
>     version := "0.0.1"
>
>     scalaVersion := "2.10.4"
>
>     scalacOptions ++= Seq("-deprecation","-feature")
>
>     libraryDependencies ++= Seq(
>       "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>       "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>       "joda-time" % "joda-time" % "2.6"
>     )
>
>     assemblyMergeStrategy in assembly := {
>       case p if p startsWith "com/esotericsoftware/minlog" =>
>     MergeStrategy.first
>       case p if p startsWith "org/apache/commons/beanutils" =>
>     MergeStrategy.first
>       case p if p startsWith "org/apache/" => MergeStrategy.last
>       case "plugin.properties" => MergeStrategy.discard
>       case p if p startsWith "META-INF" => MergeStrategy.discard
>       case x =>
>         val oldStrategy = (assemblyMergeStrategy in assembly).value
>         oldStrategy(x)
>     }
>
>     I create the jar with sbt assembly and the run with
>     $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class
>     Main target/scala-2.10/spark_example-assembly-0.0.1.jar
>     localhost:2181 test-consumer-group test1
>
>     where master:7077 is the spark master, localhost:2181 is
>     zookeeper, test-consumer-group is kafka groupid and test1 is the
>     kafka topic. The program starts and keep running but I get an
>     error and nothing is printed. In the log I found the following
>     stack trace:
>
>     14/12/11 13:02:08 INFO network.ConnectionManager: Accepted
>     connection from [10.0.3.1/10.0.3.1:54325
>     <http://10.0.3.1/10.0.3.1:54325>]
>     14/12/11 13:02:08 INFO network.SendingConnection: Initiating
>     connection to [jpl-devvax/127.0.1.1:38767 <http://127.0.1.1:38767>]
>     14/12/11 13:02:08 INFO network.SendingConnection: Connected to
>     [jpl-devvax/127.0.1.1:38767 <http://127.0.1.1:38767>], 1 messages
>     pending
>     14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>     broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B,
>     free: 265.4 MB)
>     14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered
>     receiver for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
>     14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered
>     receiver for stream 0: Error starting receiver 0 -
>     java.lang.NoClassDefFoundError:
>     kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>         at
>     kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>     Source)
>         at
>     kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>     Source)
>         at
>     scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
>     kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>     Source)
>         at
>     kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>     Source)
>         at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown
>     Source)
>         at
>     kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>     Source)
>         at
>     org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>         at
>     org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>         at
>     org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>         at
>     org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>         at
>     org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>         at
>     org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>         at
>     org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>         at
>     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>         at
>     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>         at
>     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
>     I have searched inside the fat jar and I found that that class is
>     not in it:
>
>     > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  |
>     grep
>     "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>     >
>
>     The problem is the double dollar before anonfun: if you put only
>     one then the class is there:
>
>     > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  |
>     grep
>     "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>     [...]
>     kafka/consumer/ZookeeperConsumerConnector.class
>     >
>
>     I'm submitting my job to spark-1.1.1 compiled with hadoop2.4
>     downloaded from the spark website.
>
>     My question is: how can I solve this problem? I guess the problem
>     is my sbt script but I don't understand why.
>
>
>     Thanks,
>     Mario Pastorelli
>
>


Re: Spark streaming: missing classes when kafka consumer classes

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Add these jars while creating the Context.

       val sc = new SparkContext(conf)


sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
*spark-streaming-kafka_2.10-1.1.0.jar*")
        sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
*zkclient-0.3.jar*")

sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
*metrics-core-2.2.0.jar*")
        sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
*kafka_2.10-0.8.0.jar*")

        val ssc = new StreamingContext(sc, Seconds(10))


Thanks
Best Regards

On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
mario.pastorelli@teralytics.ch> wrote:

>  Hi,
>
> I'm trying to use spark-streaming with kafka but I get a strange error on
> class that are missing. I would like to ask if my way to build the fat jar
> is correct or no. My program is
>
> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
> kafkaGroupId, kafkaTopicsWithThreads)
>                             .map(_._2)
>
> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
> iter:Iterator[CellWithLAC] =>
>   println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString)
> })
>
> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
> plugin) is
>
> name := "spark_example"
>
> version := "0.0.1"
>
> scalaVersion := "2.10.4"
>
> scalacOptions ++= Seq("-deprecation","-feature")
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>   "joda-time" % "joda-time" % "2.6"
> )
>
> assemblyMergeStrategy in assembly := {
>   case p if p startsWith "com/esotericsoftware/minlog" =>
> MergeStrategy.first
>   case p if p startsWith "org/apache/commons/beanutils" =>
> MergeStrategy.first
>   case p if p startsWith "org/apache/" => MergeStrategy.last
>   case "plugin.properties" => MergeStrategy.discard
>   case p if p startsWith "META-INF" => MergeStrategy.discard
>   case x =>
>     val oldStrategy = (assemblyMergeStrategy in assembly).value
>     oldStrategy(x)
> }
>
> I create the jar with sbt assembly and the run with
> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
> test-consumer-group test1
>
> where master:7077 is the spark master, localhost:2181 is zookeeper,
> test-consumer-group is kafka groupid and test1 is the kafka topic. The
> program starts and keep running but I get an error and nothing is printed.
> In the log I found the following stack trace:
>
> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection from
> [10.0.3.1/10.0.3.1:54325]
> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection to
> [jpl-devvax/127.0.1.1:38767]
> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to [jpl-devvax/
> 127.0.1.1:38767], 1 messages pending
> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in memory on jpl-devvax:38767 (size: 842.0 B, free: 265.4 MB)
> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver for
> stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>     at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
> Source)
>     at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
> Source)
>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>     at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
> Source)
>     at
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
> Source)
>     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>     at
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
> Source)
>     at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>     at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>     at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>     at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>     at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>     at org.apache.spark.scheduler.Task.run(Task.scala:54)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
>
> I have searched inside the fat jar and I found that that class is not in
> it:
>
> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
> >
>
> The problem is the double dollar before anonfun: if you put only one then
> the class is there:
>
> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
> [...]
> kafka/consumer/ZookeeperConsumerConnector.class
> >
>
> I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded
> from the spark website.
>
> My question is: how can I solve this problem? I guess the problem is my
> sbt script but I don't understand why.
>
>
> Thanks,
> Mario Pastorelli
>
>