You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by suhshekar52 <su...@gmail.com> on 2014/12/29 07:56:24 UTC

Setting up Simple Kafka Consumer via Spark Java app

Hello Everyone,

Thank you for the time and the help :).

My goal here is to get this program working:
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java

The only lines I do not have from the example are lines 62-67. pom.xml
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml>  

Background: Have ec2 instances running. The standalone spark is running on
top of Cloudera Manager 5.2.

Pom file is attached and the same for both clusters.
pom.xml
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml>  

Here are a few different approaches I have taken and the issues I run into:

*Standalone Mode*

1) Use spark-submit script to run: 

/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
--class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
/home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
/home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar

Interesting...I was getting an error like this: Initial job has not accepted
any resources; check your cluster UI

Now, when I run, it prints out the 3 Hello world statements in my code:
KafkaJavaConsumer.txt
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt>  

and then it seems to try to start the Kafka Stream, but fails:

14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream with
group: c1
14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for stream 0
from akka://sparkDriver
14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
10.0.1.232:2181
14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
message: Error starting receiver 0: java.lang.NoClassDefFoundError:
scala/reflect/ClassManifest
14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering receiver 0
^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for stream
0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
scala/reflect/ClassManifest
        at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
        at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
        at kafka.utils.Logging$class.$init$(Logging.scala:29)
        at
kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
        at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
        at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
        at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
        at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
        at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
        at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
        ... 18 more

14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator

I ran into a couple other Class not found errors, and was able to solve them
by adding dependencies on the pom file, but have not found such a solution
to this error.

On the Kafka side of things, I am simply typing in messages as soon as I
start the Java app on another console. Is this okay?

I have not set up an advertised host on the kafka side as I was able to
still receive messages from other consoles by setting up a consumer to
listen to the private ip:port. Is this okay?

Lastly, is there command, like --from-beginning for a consumer in the java
application to get messages from the beginning? 

Thanks a lot for the help and happy holidays!







--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
Got it to work...thanks a lot for the help! I started a new cluster where
Spark has Yarn as a dependency. I ran it with the script with local[2] and
it worked (this same script did not work with Spark in standalone mode).

A follow up question...I have seen this question posted around the internet
quite a few times, but very few people have received responses...

Instead of wordCounts.print() I want to output it to a text file or hadoop
file. The link below says that saveAsTextFiles or saveAsHadoopFiles are
appropriate output commands.

https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html

However, when I try saveAsTextFiles("prefix", "txt") the package fails to
build saying it doesn't recognize the command.

When I try saveAsHadoopFiles("hdfs://ip-10...:8020/user/test/", "abc") it
builds, but throws a runtime exception:

Exception in thread "main" java.lang.RuntimeException:
java.lang.RuntimeException: class scala.runtime.Nothing$ not
org.apache.hadoop.mapred.OutputFormat
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
        at
org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
        at
org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
        at
org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.RuntimeException: class scala.runtime.Nothing$ not
org.apache.hadoop.mapred.OutputFormat
        at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2073)
        ... 14 more

I have searched this online and it seems as though a lot of ppl have this
problem , but there doesn't seem to be an answer. Thanks for the help and
hopefully this should solve all my problems. Thanks!

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
I thought I was running it in local mode as
http://spark.apache.org/docs/1.1.1/submitting-applications.html says that
if I don't include "--deploy-mode cluster" then it will run as local mode?

I tried both of the scripts above and they gave the same result as the
script I was running before.

Also, I'm still confused as to why the program won't stop after 10 seconds
:(.

Thanks for the help! Really appreciate the time

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
You don't submit it like that :/

You use [*] things when you run the job in local mode, whereas here you are
running it in stand alone cluster mode.

You can try either of these:

1.
/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.10.12/lib/spark/bin/spark-submit
--class SimpleApp --master spark://10.0.1.230:7077 *--total-executor-cores
4* --jars $(echo /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
/home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar

2.
/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.10.12/lib/spark/bin/spark-submit
--class SimpleApp --master *local[4]* --jars $(echo
/home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
/home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar



Thanks
Best Regards

On Mon, Dec 29, 2014 at 2:36 PM, Suhas Shekar <su...@gmail.com> wrote:

> I tried submitting the application like this with 2 cores as you can see
> with the [2].
>
>
> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.10.12/lib/spark/bin/spark-submit
> --class SimpleApp --master spark://10.0.1.230:7077[2] --jars $(echo
> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>
> So I checked the url of the master (10.0.1.230). I found the results
> interesting...
>
> Workers: 2
> Cores: 4 Total, 0 Used //so does this mean running it on the localhost has
> 0 cores?
>
> Also when I ran my application, it did not show under "Running
> Applications".
>
> Also, under "Completed Applications" none of my previous runs were
> recorded (all were from spark-shell).
>
> I tried changing my submit script to 10.0.1.231:70877[2], but that did
> not change anything.
>
> Any suggestions on if I should change my submit script or how I could do
> so?
>
> Thanks a lot for the help!
>
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Mon, Dec 29, 2014 at 12:55 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> How many cores are you allocated/seeing in the webui? (that usually runs
>> on 8080, for cloudera i think its 18080). Most likely the job is being
>> allocated 1 core (should be >= 2 cores) and that's why the count is never
>> happening.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Dec 29, 2014 at 2:22 PM, Suhas Shekar <su...@gmail.com>
>> wrote:
>>
>>> So it got rid of the logs, but the problem still persists that :
>>>
>>> a) The program never terminates (I have pasted all output after the
>>> Hello World statements below)
>>>
>>> b) I am not seeing the word count
>>>
>>> c) I tried adding [2] next to my 10.0.1.232:2181 looking at this post
>>> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-questions-td3281.html,
>>> but that did not work as well.
>>>
>>> Any other suggestions are appreciated.
>>>
>>> Thanks a lot for the time :)
>>>
>>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> starting auto committer every 60000 ms
>>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
>>> registering consumer
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
>>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
>>> registering consumer
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
>>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> starting watcher executor thread for consumer
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
>>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
>>> rebalancing consumer
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
>>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>>> [ConsumerFetcherManager-1419860798873] Stopping leader finder thread
>>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>>> [ConsumerFetcherManager-1419860798873] Stopping all fetchers
>>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>>> [ConsumerFetcherManager-1419860798873] All connections stopped
>>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> Cleared all relevant queues for this fetcher
>>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> Cleared the data chunks in all the consumer message iterators
>>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> Committing all offsets after clearing the fetcher queues
>>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> Releasing partition ownership
>>> 14/12/29 08:46:39 INFO RangeAssignor: Consumer
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
>>> rebalancing the following partitions: ArrayBuffer(0) for topic test with
>>> consumers:
>>> List(c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0)
>>> 14/12/29 08:46:39 INFO RangeAssignor:
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
>>> attempting to claim partition 0
>>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
>>> successfully owned partition 0 for topic test
>>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>>> Consumer c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
>>> selected partitions : test:0: fetched offset = 221: consumed offset = 221
>>> 14/12/29 08:46:39 INFO ConsumerFetcherManager$LeaderFinderThread:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-leader-finder-thread],
>>> Starting
>>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
>>> rebalancing consumer
>>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
>>> 14/12/29 08:46:39 INFO VerifiableProperties: Verifying properties
>>> 14/12/29 08:46:39 INFO VerifiableProperties: Property client.id is
>>> overridden to c1
>>> 14/12/29 08:46:39 INFO VerifiableProperties: Property
>>> metadata.broker.list is overridden to
>>> ip-10-0-1-232.us-west-1.compute.internal:9092
>>> 14/12/29 08:46:39 INFO VerifiableProperties: Property request.timeout.ms
>>> is overridden to 30000
>>> 14/12/29 08:46:39 INFO ClientUtils$: Fetching metadata from broker
>>> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092 with
>>> correlation id 0 for 1 topic(s) Set(test)
>>> 14/12/29 08:46:39 INFO SyncProducer: Connected to
>>> ip-10-0-1-232.us-west-1.compute.internal:9092 for producing
>>> 14/12/29 08:46:39 INFO SyncProducer: Disconnecting from
>>> ip-10-0-1-232.us-west-1.compute.internal:9092
>>> 14/12/29 08:46:39 INFO ConsumerFetcherThread:
>>> [ConsumerFetcherThread-c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0-0],
>>> Starting
>>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>>> [ConsumerFetcherManager-1419860798873] Added fetcher for partitions
>>> ArrayBuffer([[test,0], initOffset 221 to broker
>>> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092] )
>>>
>>>
>>> Suhas Shekar
>>>
>>> University of California, Los Angeles
>>> B.A. Economics, Specialization in Computing 2014
>>>
>>> On Mon, Dec 29, 2014 at 12:43 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Now, Add these lines to get ride of those logs
>>>>
>>>>     import org.apache.log4j.Logger
>>>>     import org.apache.log4j.Level
>>>>
>>>>     Logger.getLogger("org").setLevel(Level.OFF)
>>>>     Logger.getLogger("akka").setLevel(Level.OFF)
>>>>
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Dec 29, 2014 at 2:09 PM, Suhas Shekar <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it
>>>>> does not stop. When I am not pushing in any data it gives me this:
>>>>>
>>>>> 14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks
>>>>> 14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000
>>>>> ms
>>>>> 14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks
>>>>> 14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000
>>>>> ms
>>>>> 14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks
>>>>>
>>>>> When I am pushing in data it does this:
>>>>>
>>>>> 14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200
>>>>> already exists on this machine; not re-adding it
>>>>> 14/12/29 08:35:08 INFO BlockGenerator: Pushed block
>>>>> input-0-1419860108200
>>>>> 14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with
>>>>> curMem=6515, maxMem=277842493
>>>>> 14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored
>>>>> as bytes in memory (estimated size 80.0 B, free 265.0 MB)
>>>>> 14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200
>>>>> in memory on ip-10-0-1-230.us-west-1.compute.internal:48171 (size: 80.0 B,
>>>>> free: 265.0 MB)
>>>>> 14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block
>>>>> input-0-1419860109200
>>>>> 14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200
>>>>> already exists on this machine; not re-adding it
>>>>> 14/12/29 08:35:09 INFO BlockGenerator: Pushed block
>>>>> input-0-1419860109200
>>>>> 14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks
>>>>>
>>>>> I know I am close as everytime I enter a message in my kafka producer,
>>>>> the console reacts as I showed above...do I have to place the
>>>>> awaitTermination somewhere else? Or Is the warning saying there is an
>>>>> underlying problem?
>>>>>
>>>>> Thank you for the help...hopefully I am as close as I think I am!
>>>>>
>>>>>
>>>>>
>>>>> Suhas Shekar
>>>>>
>>>>> University of California, Los Angeles
>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>
>>>>> On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das <
>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>
>>>>>> If you want to stop the streaming after 10 seconds, then use
>>>>>> ssc.awaitTermination(10000). Make sure you push some data to kafka for the
>>>>>> streaming to consume within the 10 seconds.
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <su...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm very close! So I added that and then I added this:
>>>>>>> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
>>>>>>>
>>>>>>> and it seems as though the stream is working as it says Stream 0
>>>>>>> received 1 or 2 blocks as I enter in messages on my kafka producer.
>>>>>>> However, the Receiver seems to keep trying every 2 seconds (as I've
>>>>>>> included 2000 in my duration in my java app). How can I stop the Receiver
>>>>>>> from consuming messages after 10 seconds and output the word count to the
>>>>>>> console?
>>>>>>>
>>>>>>> Thanks a lot for all the help! I'm excited to see this word count :)
>>>>>>>
>>>>>>> Suhas Shekar
>>>>>>>
>>>>>>> University of California, Los Angeles
>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>
>>>>>>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <
>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>> Add this jar in the dependency
>>>>>>>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <
>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello Akhil,
>>>>>>>>>
>>>>>>>>> I chanced my Kafka dependency to 2.10 (which is the version of
>>>>>>>>> kafka that was on 10.0.1.232). I am getting a slightly different error, but
>>>>>>>>> at the same place as the previous error (pasted below).
>>>>>>>>>
>>>>>>>>> FYI, when I make these changes to the pom file, I do "mvn clean
>>>>>>>>> package" then cp the new jar files from the repository to my lib of jar
>>>>>>>>> files which is a argument in my spark-submit script which is in my original
>>>>>>>>> post.
>>>>>>>>>
>>>>>>>>> Thanks again for the time and help...much appreciated.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>> Stream with group: c1
>>>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>> 10.0.1.232:2181
>>>>>>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>>>>>>>>> overridden to c1
>>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>>>>> zookeeper.connect is overridden to 10.0.1.232:2181
>>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>>>>> zookeeper.connection.timeout.ms is overridden to 10000
>>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>>>> with message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>>>>> com/yammer/metrics/Metrics
>>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>>> onStop
>>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>> receiver 0
>>>>>>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>>>>>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>> com/yammer/metrics/Metrics
>>>>>>>>>         at
>>>>>>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>>>>>>>         at
>>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>>>>>>>         at
>>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>>>>>>>         at
>>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>>>>>>>         at
>>>>>>>>> kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>> com.yammer.metrics.Metrics
>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>>> Method)
>>>>>>>>>         at
>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>>         ... 18 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Suhas Shekar
>>>>>>>>>
>>>>>>>>> University of California, Los Angeles
>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>>
>>>>>>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <
>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>>>>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>>>>>>>> same error.
>>>>>>>>>>
>>>>>>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>>>>>>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>>>>>>>>> don't think that will solve the error as I dont think the application had
>>>>>>>>>> got to level yet.
>>>>>>>>>>
>>>>>>>>>> Please let me know of any possible next steps.
>>>>>>>>>>
>>>>>>>>>> Thank you again for the time and the help!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Suhas Shekar
>>>>>>>>>>
>>>>>>>>>> University of California, Los Angeles
>>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>>>
>>>>>>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <
>>>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just looked at the pom file that you are using, why are you
>>>>>>>>>>> having different versions in it?
>>>>>>>>>>>
>>>>>>>>>>> <dependency>
>>>>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>>>>>>>> <version>*1.1.1*</version>
>>>>>>>>>>> </dependency>
>>>>>>>>>>> <dependency>
>>>>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>>>>>>>> <version>*1.0.2*</version>
>>>>>>>>>>> </dependency>
>>>>>>>>>>>
>>>>>>>>>>> ​can you make both the versions the same?​
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <
>>>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> 1) Could you please clarify on what you mean by checking the
>>>>>>>>>>>> Scala version is correct? In my pom.xml file it is 2.10.4 (which is the
>>>>>>>>>>>> same as when I start spark-shell).
>>>>>>>>>>>>
>>>>>>>>>>>> 2) The spark master URL is definitely correct as I have run
>>>>>>>>>>>> other apps with the same script that use Spark (like a word count with a
>>>>>>>>>>>> local file)
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for the help!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Suhas Shekar
>>>>>>>>>>>>
>>>>>>>>>>>> University of California, Los Angeles
>>>>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Make sure you verify the following:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on
>>>>>>>>>>>>> the webui's top left corner (running on port 8080)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <
>>>>>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Everyone,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you for the time and the help :).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My goal here is to get this program working:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The only lines I do not have from the example are lines
>>>>>>>>>>>>>> 62-67. pom.xml
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Background: Have ec2 instances running. The standalone spark
>>>>>>>>>>>>>> is running on
>>>>>>>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>>>>>>>> pom.xml
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here are a few different approaches I have taken and the
>>>>>>>>>>>>>> issues I run into:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Standalone Mode*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars
>>>>>>>>>>>>>> $(echo
>>>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Interesting...I was getting an error like this: Initial job
>>>>>>>>>>>>>> has not accepted
>>>>>>>>>>>>>> any resources; check your cluster UI
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements
>>>>>>>>>>>>>> in my code:
>>>>>>>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>>>>>>> Stream with
>>>>>>>>>>>>>> group: c1
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver
>>>>>>>>>>>>>> for stream 0
>>>>>>>>>>>>>> from akka://sparkDriver
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>>>>>>> 10.0.1.232:2181
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing
>>>>>>>>>>>>>> thread
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping
>>>>>>>>>>>>>> receiver with
>>>>>>>>>>>>>> message: Error starting receiver 0:
>>>>>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called
>>>>>>>>>>>>>> receiver onStop
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>>>>>>> receiver 0
>>>>>>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered
>>>>>>>>>>>>>> receiver for stream
>>>>>>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>>>>>>>> Method)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>>>>>>>         ... 18 more
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped
>>>>>>>>>>>>>> receiver 0
>>>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I ran into a couple other Class not found errors, and was
>>>>>>>>>>>>>> able to solve them
>>>>>>>>>>>>>> by adding dependencies on the pom file, but have not found
>>>>>>>>>>>>>> such a solution
>>>>>>>>>>>>>> to this error.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On the Kafka side of things, I am simply typing in messages
>>>>>>>>>>>>>> as soon as I
>>>>>>>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have not set up an advertised host on the kafka side as I
>>>>>>>>>>>>>> was able to
>>>>>>>>>>>>>> still receive messages from other consoles by setting up a
>>>>>>>>>>>>>> consumer to
>>>>>>>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Lastly, is there command, like --from-beginning for a
>>>>>>>>>>>>>> consumer in the java
>>>>>>>>>>>>>> application to get messages from the beginning?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
I tried submitting the application like this with 2 cores as you can see
with the [2].


/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.10.12/lib/spark/bin/spark-submit
--class SimpleApp --master spark://10.0.1.230:7077[2] --jars $(echo
/home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
/home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar

So I checked the url of the master (10.0.1.230). I found the results
interesting...

Workers: 2
Cores: 4 Total, 0 Used //so does this mean running it on the localhost has
0 cores?

Also when I ran my application, it did not show under "Running
Applications".

Also, under "Completed Applications" none of my previous runs were recorded
(all were from spark-shell).

I tried changing my submit script to 10.0.1.231:70877[2], but that did not
change anything.

Any suggestions on if I should change my submit script or how I could do so?

Thanks a lot for the help!


Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Dec 29, 2014 at 12:55 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> How many cores are you allocated/seeing in the webui? (that usually runs
> on 8080, for cloudera i think its 18080). Most likely the job is being
> allocated 1 core (should be >= 2 cores) and that's why the count is never
> happening.
>
> Thanks
> Best Regards
>
> On Mon, Dec 29, 2014 at 2:22 PM, Suhas Shekar <su...@gmail.com>
> wrote:
>
>> So it got rid of the logs, but the problem still persists that :
>>
>> a) The program never terminates (I have pasted all output after the Hello
>> World statements below)
>>
>> b) I am not seeing the word count
>>
>> c) I tried adding [2] next to my 10.0.1.232:2181 looking at this post
>> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-questions-td3281.html,
>> but that did not work as well.
>>
>> Any other suggestions are appreciated.
>>
>> Thanks a lot for the time :)
>>
>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> starting auto committer every 60000 ms
>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
>> registering consumer
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
>> registering consumer
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> starting watcher executor thread for consumer
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
>> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
>> rebalancing consumer
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>> [ConsumerFetcherManager-1419860798873] Stopping leader finder thread
>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>> [ConsumerFetcherManager-1419860798873] Stopping all fetchers
>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>> [ConsumerFetcherManager-1419860798873] All connections stopped
>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> Cleared all relevant queues for this fetcher
>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> Cleared the data chunks in all the consumer message iterators
>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> Committing all offsets after clearing the fetcher queues
>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> Releasing partition ownership
>> 14/12/29 08:46:39 INFO RangeAssignor: Consumer
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
>> rebalancing the following partitions: ArrayBuffer(0) for topic test with
>> consumers:
>> List(c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0)
>> 14/12/29 08:46:39 INFO RangeAssignor:
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
>> attempting to claim partition 0
>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
>> successfully owned partition 0 for topic test
>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
>> Consumer c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
>> selected partitions : test:0: fetched offset = 221: consumed offset = 221
>> 14/12/29 08:46:39 INFO ConsumerFetcherManager$LeaderFinderThread:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-leader-finder-thread],
>> Starting
>> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
>> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
>> rebalancing consumer
>> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
>> 14/12/29 08:46:39 INFO VerifiableProperties: Verifying properties
>> 14/12/29 08:46:39 INFO VerifiableProperties: Property client.id is
>> overridden to c1
>> 14/12/29 08:46:39 INFO VerifiableProperties: Property
>> metadata.broker.list is overridden to
>> ip-10-0-1-232.us-west-1.compute.internal:9092
>> 14/12/29 08:46:39 INFO VerifiableProperties: Property request.timeout.ms
>> is overridden to 30000
>> 14/12/29 08:46:39 INFO ClientUtils$: Fetching metadata from broker
>> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092 with
>> correlation id 0 for 1 topic(s) Set(test)
>> 14/12/29 08:46:39 INFO SyncProducer: Connected to
>> ip-10-0-1-232.us-west-1.compute.internal:9092 for producing
>> 14/12/29 08:46:39 INFO SyncProducer: Disconnecting from
>> ip-10-0-1-232.us-west-1.compute.internal:9092
>> 14/12/29 08:46:39 INFO ConsumerFetcherThread:
>> [ConsumerFetcherThread-c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0-0],
>> Starting
>> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
>> [ConsumerFetcherManager-1419860798873] Added fetcher for partitions
>> ArrayBuffer([[test,0], initOffset 221 to broker
>> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092] )
>>
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Mon, Dec 29, 2014 at 12:43 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Now, Add these lines to get ride of those logs
>>>
>>>     import org.apache.log4j.Logger
>>>     import org.apache.log4j.Level
>>>
>>>     Logger.getLogger("org").setLevel(Level.OFF)
>>>     Logger.getLogger("akka").setLevel(Level.OFF)
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Dec 29, 2014 at 2:09 PM, Suhas Shekar <su...@gmail.com>
>>> wrote:
>>>
>>>> Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it
>>>> does not stop. When I am not pushing in any data it gives me this:
>>>>
>>>> 14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks
>>>> 14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000
>>>> ms
>>>> 14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks
>>>> 14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000
>>>> ms
>>>> 14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks
>>>>
>>>> When I am pushing in data it does this:
>>>>
>>>> 14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200
>>>> already exists on this machine; not re-adding it
>>>> 14/12/29 08:35:08 INFO BlockGenerator: Pushed block
>>>> input-0-1419860108200
>>>> 14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with
>>>> curMem=6515, maxMem=277842493
>>>> 14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored
>>>> as bytes in memory (estimated size 80.0 B, free 265.0 MB)
>>>> 14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200 in
>>>> memory on ip-10-0-1-230.us-west-1.compute.internal:48171 (size: 80.0 B,
>>>> free: 265.0 MB)
>>>> 14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block
>>>> input-0-1419860109200
>>>> 14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200
>>>> already exists on this machine; not re-adding it
>>>> 14/12/29 08:35:09 INFO BlockGenerator: Pushed block
>>>> input-0-1419860109200
>>>> 14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks
>>>>
>>>> I know I am close as everytime I enter a message in my kafka producer,
>>>> the console reacts as I showed above...do I have to place the
>>>> awaitTermination somewhere else? Or Is the warning saying there is an
>>>> underlying problem?
>>>>
>>>> Thank you for the help...hopefully I am as close as I think I am!
>>>>
>>>>
>>>>
>>>> Suhas Shekar
>>>>
>>>> University of California, Los Angeles
>>>> B.A. Economics, Specialization in Computing 2014
>>>>
>>>> On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das <akhil@sigmoidanalytics.com
>>>> > wrote:
>>>>
>>>>> If you want to stop the streaming after 10 seconds, then use
>>>>> ssc.awaitTermination(10000). Make sure you push some data to kafka for the
>>>>> streaming to consume within the 10 seconds.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm very close! So I added that and then I added this:
>>>>>> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
>>>>>>
>>>>>> and it seems as though the stream is working as it says Stream 0
>>>>>> received 1 or 2 blocks as I enter in messages on my kafka producer.
>>>>>> However, the Receiver seems to keep trying every 2 seconds (as I've
>>>>>> included 2000 in my duration in my java app). How can I stop the Receiver
>>>>>> from consuming messages after 10 seconds and output the word count to the
>>>>>> console?
>>>>>>
>>>>>> Thanks a lot for all the help! I'm excited to see this word count :)
>>>>>>
>>>>>> Suhas Shekar
>>>>>>
>>>>>> University of California, Los Angeles
>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>
>>>>>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <
>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Add this jar in the dependency
>>>>>>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <suhshekar52@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hello Akhil,
>>>>>>>>
>>>>>>>> I chanced my Kafka dependency to 2.10 (which is the version of
>>>>>>>> kafka that was on 10.0.1.232). I am getting a slightly different error, but
>>>>>>>> at the same place as the previous error (pasted below).
>>>>>>>>
>>>>>>>> FYI, when I make these changes to the pom file, I do "mvn clean
>>>>>>>> package" then cp the new jar files from the repository to my lib of jar
>>>>>>>> files which is a argument in my spark-submit script which is in my original
>>>>>>>> post.
>>>>>>>>
>>>>>>>> Thanks again for the time and help...much appreciated.
>>>>>>>>
>>>>>>>>
>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>> Stream with group: c1
>>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>> 10.0.1.232:2181
>>>>>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>>>>>>>> overridden to c1
>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>>>> zookeeper.connect is overridden to 10.0.1.232:2181
>>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>>>> zookeeper.connection.timeout.ms is overridden to 10000
>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>>> with message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>>>> com/yammer/metrics/Metrics
>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>> onStop
>>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>> receiver 0
>>>>>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>>>>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>> com/yammer/metrics/Metrics
>>>>>>>>         at
>>>>>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>>>>>>         at
>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>>>>>>         at
>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>>>>>>         at
>>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>>>>>>         at
>>>>>>>> kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>         at
>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>         at
>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>         at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>         at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>> com.yammer.metrics.Metrics
>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>> Method)
>>>>>>>>         at
>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>         ... 18 more
>>>>>>>>
>>>>>>>>
>>>>>>>> Suhas Shekar
>>>>>>>>
>>>>>>>> University of California, Los Angeles
>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>
>>>>>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <
>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>>>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>>>>>>> same error.
>>>>>>>>>
>>>>>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>>>>>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>>>>>>>> don't think that will solve the error as I dont think the application had
>>>>>>>>> got to level yet.
>>>>>>>>>
>>>>>>>>> Please let me know of any possible next steps.
>>>>>>>>>
>>>>>>>>> Thank you again for the time and the help!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Suhas Shekar
>>>>>>>>>
>>>>>>>>> University of California, Los Angeles
>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>>
>>>>>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <
>>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>>
>>>>>>>>>> Just looked at the pom file that you are using, why are you
>>>>>>>>>> having different versions in it?
>>>>>>>>>>
>>>>>>>>>> <dependency>
>>>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>>>>>>> <version>*1.1.1*</version>
>>>>>>>>>> </dependency>
>>>>>>>>>> <dependency>
>>>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>>>>>>> <version>*1.0.2*</version>
>>>>>>>>>> </dependency>
>>>>>>>>>>
>>>>>>>>>> ​can you make both the versions the same?​
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <
>>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> 1) Could you please clarify on what you mean by checking the
>>>>>>>>>>> Scala version is correct? In my pom.xml file it is 2.10.4 (which is the
>>>>>>>>>>> same as when I start spark-shell).
>>>>>>>>>>>
>>>>>>>>>>> 2) The spark master URL is definitely correct as I have run
>>>>>>>>>>> other apps with the same script that use Spark (like a word count with a
>>>>>>>>>>> local file)
>>>>>>>>>>>
>>>>>>>>>>> Thank you for the help!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Suhas Shekar
>>>>>>>>>>>
>>>>>>>>>>> University of California, Los Angeles
>>>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Make sure you verify the following:
>>>>>>>>>>>>
>>>>>>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on
>>>>>>>>>>>> the webui's top left corner (running on port 8080)
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <
>>>>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Everyone,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you for the time and the help :).
>>>>>>>>>>>>>
>>>>>>>>>>>>> My goal here is to get this program working:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>>>>>>>
>>>>>>>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>>>>>>>> pom.xml
>>>>>>>>>>>>> <
>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>> Background: Have ec2 instances running. The standalone spark
>>>>>>>>>>>>> is running on
>>>>>>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>>>>>>> pom.xml
>>>>>>>>>>>>> <
>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here are a few different approaches I have taken and the
>>>>>>>>>>>>> issues I run into:
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Standalone Mode*
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars
>>>>>>>>>>>>> $(echo
>>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>>>>>>>
>>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>>>>>>
>>>>>>>>>>>>> Interesting...I was getting an error like this: Initial job
>>>>>>>>>>>>> has not accepted
>>>>>>>>>>>>> any resources; check your cluster UI
>>>>>>>>>>>>>
>>>>>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements in
>>>>>>>>>>>>> my code:
>>>>>>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>>>>>>> <
>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>>>>>> Stream with
>>>>>>>>>>>>> group: c1
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver
>>>>>>>>>>>>> for stream 0
>>>>>>>>>>>>> from akka://sparkDriver
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>>>>>> 10.0.1.232:2181
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing
>>>>>>>>>>>>> thread
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping
>>>>>>>>>>>>> receiver with
>>>>>>>>>>>>> message: Error starting receiver 0:
>>>>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>>>>>>> onStop
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>>>>>> receiver 0
>>>>>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered
>>>>>>>>>>>>> receiver for stream
>>>>>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>>
>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>>>>>>> Method)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>>>>>>         ... 18 more
>>>>>>>>>>>>>
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped
>>>>>>>>>>>>> receiver 0
>>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>>>>>>
>>>>>>>>>>>>> I ran into a couple other Class not found errors, and was able
>>>>>>>>>>>>> to solve them
>>>>>>>>>>>>> by adding dependencies on the pom file, but have not found
>>>>>>>>>>>>> such a solution
>>>>>>>>>>>>> to this error.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On the Kafka side of things, I am simply typing in messages as
>>>>>>>>>>>>> soon as I
>>>>>>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have not set up an advertised host on the kafka side as I
>>>>>>>>>>>>> was able to
>>>>>>>>>>>>> still receive messages from other consoles by setting up a
>>>>>>>>>>>>> consumer to
>>>>>>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Lastly, is there command, like --from-beginning for a consumer
>>>>>>>>>>>>> in the java
>>>>>>>>>>>>> application to get messages from the beginning?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
How many cores are you allocated/seeing in the webui? (that usually runs on
8080, for cloudera i think its 18080). Most likely the job is being
allocated 1 core (should be >= 2 cores) and that's why the count is never
happening.

Thanks
Best Regards

On Mon, Dec 29, 2014 at 2:22 PM, Suhas Shekar <su...@gmail.com> wrote:

> So it got rid of the logs, but the problem still persists that :
>
> a) The program never terminates (I have pasted all output after the Hello
> World statements below)
>
> b) I am not seeing the word count
>
> c) I tried adding [2] next to my 10.0.1.232:2181 looking at this post
> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-questions-td3281.html,
> but that did not work as well.
>
> Any other suggestions are appreciated.
>
> Thanks a lot for the time :)
>
> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> starting auto committer every 60000 ms
> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
> registering consumer
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
> registering consumer
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> starting watcher executor thread for consumer
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
> 14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
> rebalancing consumer
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
> [ConsumerFetcherManager-1419860798873] Stopping leader finder thread
> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
> [ConsumerFetcherManager-1419860798873] Stopping all fetchers
> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
> [ConsumerFetcherManager-1419860798873] All connections stopped
> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> Cleared all relevant queues for this fetcher
> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> Cleared the data chunks in all the consumer message iterators
> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> Committing all offsets after clearing the fetcher queues
> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> Releasing partition ownership
> 14/12/29 08:46:39 INFO RangeAssignor: Consumer
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
> rebalancing the following partitions: ArrayBuffer(0) for topic test with
> consumers:
> List(c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0)
> 14/12/29 08:46:39 INFO RangeAssignor:
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
> attempting to claim partition 0
> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
> successfully owned partition 0 for topic test
> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
> Consumer c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
> selected partitions : test:0: fetched offset = 221: consumed offset = 221
> 14/12/29 08:46:39 INFO ConsumerFetcherManager$LeaderFinderThread:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-leader-finder-thread],
> Starting
> 14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
> [c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
> rebalancing consumer
> c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
> 14/12/29 08:46:39 INFO VerifiableProperties: Verifying properties
> 14/12/29 08:46:39 INFO VerifiableProperties: Property client.id is
> overridden to c1
> 14/12/29 08:46:39 INFO VerifiableProperties: Property metadata.broker.list
> is overridden to ip-10-0-1-232.us-west-1.compute.internal:9092
> 14/12/29 08:46:39 INFO VerifiableProperties: Property request.timeout.ms
> is overridden to 30000
> 14/12/29 08:46:39 INFO ClientUtils$: Fetching metadata from broker
> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092 with
> correlation id 0 for 1 topic(s) Set(test)
> 14/12/29 08:46:39 INFO SyncProducer: Connected to
> ip-10-0-1-232.us-west-1.compute.internal:9092 for producing
> 14/12/29 08:46:39 INFO SyncProducer: Disconnecting from
> ip-10-0-1-232.us-west-1.compute.internal:9092
> 14/12/29 08:46:39 INFO ConsumerFetcherThread:
> [ConsumerFetcherThread-c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0-0],
> Starting
> 14/12/29 08:46:39 INFO ConsumerFetcherManager:
> [ConsumerFetcherManager-1419860798873] Added fetcher for partitions
> ArrayBuffer([[test,0], initOffset 221 to broker
> id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092] )
>
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Mon, Dec 29, 2014 at 12:43 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Now, Add these lines to get ride of those logs
>>
>>     import org.apache.log4j.Logger
>>     import org.apache.log4j.Level
>>
>>     Logger.getLogger("org").setLevel(Level.OFF)
>>     Logger.getLogger("akka").setLevel(Level.OFF)
>>
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Dec 29, 2014 at 2:09 PM, Suhas Shekar <su...@gmail.com>
>> wrote:
>>
>>> Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it
>>> does not stop. When I am not pushing in any data it gives me this:
>>>
>>> 14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks
>>> 14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000 ms
>>> 14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks
>>> 14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000 ms
>>> 14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks
>>>
>>> When I am pushing in data it does this:
>>>
>>> 14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200 already
>>> exists on this machine; not re-adding it
>>> 14/12/29 08:35:08 INFO BlockGenerator: Pushed block input-0-1419860108200
>>> 14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with
>>> curMem=6515, maxMem=277842493
>>> 14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored
>>> as bytes in memory (estimated size 80.0 B, free 265.0 MB)
>>> 14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200 in
>>> memory on ip-10-0-1-230.us-west-1.compute.internal:48171 (size: 80.0 B,
>>> free: 265.0 MB)
>>> 14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block
>>> input-0-1419860109200
>>> 14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200 already
>>> exists on this machine; not re-adding it
>>> 14/12/29 08:35:09 INFO BlockGenerator: Pushed block input-0-1419860109200
>>> 14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks
>>>
>>> I know I am close as everytime I enter a message in my kafka producer,
>>> the console reacts as I showed above...do I have to place the
>>> awaitTermination somewhere else? Or Is the warning saying there is an
>>> underlying problem?
>>>
>>> Thank you for the help...hopefully I am as close as I think I am!
>>>
>>>
>>>
>>> Suhas Shekar
>>>
>>> University of California, Los Angeles
>>> B.A. Economics, Specialization in Computing 2014
>>>
>>> On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> If you want to stop the streaming after 10 seconds, then use
>>>> ssc.awaitTermination(10000). Make sure you push some data to kafka for the
>>>> streaming to consume within the 10 seconds.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm very close! So I added that and then I added this:
>>>>> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
>>>>>
>>>>> and it seems as though the stream is working as it says Stream 0
>>>>> received 1 or 2 blocks as I enter in messages on my kafka producer.
>>>>> However, the Receiver seems to keep trying every 2 seconds (as I've
>>>>> included 2000 in my duration in my java app). How can I stop the Receiver
>>>>> from consuming messages after 10 seconds and output the word count to the
>>>>> console?
>>>>>
>>>>> Thanks a lot for all the help! I'm excited to see this word count :)
>>>>>
>>>>> Suhas Shekar
>>>>>
>>>>> University of California, Los Angeles
>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>
>>>>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <
>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>
>>>>>> Add this jar in the dependency
>>>>>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <su...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Akhil,
>>>>>>>
>>>>>>> I chanced my Kafka dependency to 2.10 (which is the version of kafka
>>>>>>> that was on 10.0.1.232). I am getting a slightly different error, but at
>>>>>>> the same place as the previous error (pasted below).
>>>>>>>
>>>>>>> FYI, when I make these changes to the pom file, I do "mvn clean
>>>>>>> package" then cp the new jar files from the repository to my lib of jar
>>>>>>> files which is a argument in my spark-submit script which is in my original
>>>>>>> post.
>>>>>>>
>>>>>>> Thanks again for the time and help...much appreciated.
>>>>>>>
>>>>>>>
>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>>>>> with group: c1
>>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>> 10.0.1.232:2181
>>>>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>>>>>>> overridden to c1
>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>>> zookeeper.connect is overridden to 10.0.1.232:2181
>>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>>> zookeeper.connection.timeout.ms is overridden to 10000
>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>> with message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>>> com/yammer/metrics/Metrics
>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>> receiver 0
>>>>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>>>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>> com/yammer/metrics/Metrics
>>>>>>>         at
>>>>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>>>>>         at
>>>>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>>>>>         at
>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>>>>>         at
>>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>>>>>         at
>>>>>>> kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>         at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>         at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>> com.yammer.metrics.Metrics
>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>         ... 18 more
>>>>>>>
>>>>>>>
>>>>>>> Suhas Shekar
>>>>>>>
>>>>>>> University of California, Los Angeles
>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>
>>>>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <
>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>
>>>>>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>>>>>> same error.
>>>>>>>>
>>>>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>>>>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>>>>>>> don't think that will solve the error as I dont think the application had
>>>>>>>> got to level yet.
>>>>>>>>
>>>>>>>> Please let me know of any possible next steps.
>>>>>>>>
>>>>>>>> Thank you again for the time and the help!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Suhas Shekar
>>>>>>>>
>>>>>>>> University of California, Los Angeles
>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>
>>>>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <
>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Just looked at the pom file that you are using, why are you having
>>>>>>>>> different versions in it?
>>>>>>>>>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>>>>>> <version>*1.1.1*</version>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>>>>>> <version>*1.0.2*</version>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>> ​can you make both the versions the same?​
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <
>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> 1) Could you please clarify on what you mean by checking the
>>>>>>>>>> Scala version is correct? In my pom.xml file it is 2.10.4 (which is the
>>>>>>>>>> same as when I start spark-shell).
>>>>>>>>>>
>>>>>>>>>> 2) The spark master URL is definitely correct as I have run other
>>>>>>>>>> apps with the same script that use Spark (like a word count with a local
>>>>>>>>>> file)
>>>>>>>>>>
>>>>>>>>>> Thank you for the help!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Suhas Shekar
>>>>>>>>>>
>>>>>>>>>> University of California, Los Angeles
>>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>>>
>>>>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Make sure you verify the following:
>>>>>>>>>>>
>>>>>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on
>>>>>>>>>>> the webui's top left corner (running on port 8080)
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <
>>>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello Everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for the time and the help :).
>>>>>>>>>>>>
>>>>>>>>>>>> My goal here is to get this program working:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>>>>>>
>>>>>>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>>>>>>> pom.xml
>>>>>>>>>>>> <
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>>>>>>>> running on
>>>>>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>>>>>
>>>>>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>>>>>> pom.xml
>>>>>>>>>>>> <
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>> Here are a few different approaches I have taken and the issues
>>>>>>>>>>>> I run into:
>>>>>>>>>>>>
>>>>>>>>>>>> *Standalone Mode*
>>>>>>>>>>>>
>>>>>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars
>>>>>>>>>>>> $(echo
>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>>>>>
>>>>>>>>>>>> Interesting...I was getting an error like this: Initial job has
>>>>>>>>>>>> not accepted
>>>>>>>>>>>> any resources; check your cluster UI
>>>>>>>>>>>>
>>>>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements in
>>>>>>>>>>>> my code:
>>>>>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>>>>>> <
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>>>>>
>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>>>>> Stream with
>>>>>>>>>>>> group: c1
>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>>>>>>> stream 0
>>>>>>>>>>>> from akka://sparkDriver
>>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>>>>> 10.0.1.232:2181
>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing
>>>>>>>>>>>> thread
>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping
>>>>>>>>>>>> receiver with
>>>>>>>>>>>> message: Error starting receiver 0:
>>>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>>>>>> onStop
>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>>>>> receiver 0
>>>>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered
>>>>>>>>>>>> receiver for stream
>>>>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>>         at
>>>>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>>>>>         at
>>>>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>>>>>         at
>>>>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>>>>>         at
>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>>>>>         at
>>>>>>>>>>>> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>>>>>> Method)
>>>>>>>>>>>>         at
>>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>>>>>         ... 18 more
>>>>>>>>>>>>
>>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver
>>>>>>>>>>>> 0
>>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>>>>>
>>>>>>>>>>>> I ran into a couple other Class not found errors, and was able
>>>>>>>>>>>> to solve them
>>>>>>>>>>>> by adding dependencies on the pom file, but have not found such
>>>>>>>>>>>> a solution
>>>>>>>>>>>> to this error.
>>>>>>>>>>>>
>>>>>>>>>>>> On the Kafka side of things, I am simply typing in messages as
>>>>>>>>>>>> soon as I
>>>>>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>>>>>
>>>>>>>>>>>> I have not set up an advertised host on the kafka side as I was
>>>>>>>>>>>> able to
>>>>>>>>>>>> still receive messages from other consoles by setting up a
>>>>>>>>>>>> consumer to
>>>>>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>>>>>
>>>>>>>>>>>> Lastly, is there command, like --from-beginning for a consumer
>>>>>>>>>>>> in the java
>>>>>>>>>>>> application to get messages from the beginning?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
So it got rid of the logs, but the problem still persists that :

a) The program never terminates (I have pasted all output after the Hello
World statements below)

b) I am not seeing the word count

c) I tried adding [2] next to my 10.0.1.232:2181 looking at this post
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-questions-td3281.html,
but that did not work as well.

Any other suggestions are appreciated.

Thanks a lot for the time :)

14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
starting auto committer every 60000 ms
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
registering consumer
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
registering consumer
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 in ZK
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
starting watcher executor thread for consumer
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], begin
rebalancing consumer
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] Stopping leader finder thread
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] Stopping all fetchers
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] All connections stopped
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
Cleared all relevant queues for this fetcher
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
Cleared the data chunks in all the consumer message iterators
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
Committing all offsets after clearing the fetcher queues
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
Releasing partition ownership
14/12/29 08:46:39 INFO RangeAssignor: Consumer
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
rebalancing the following partitions: ArrayBuffer(0) for topic test with
consumers:
List(c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0)
14/12/29 08:46:39 INFO RangeAssignor:
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
attempting to claim partition 0
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0
successfully owned partition 0 for topic test
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471],
Consumer c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471
selected partitions : test:0: fetched offset = 221: consumed offset = 221
14/12/29 08:46:39 INFO ConsumerFetcherManager$LeaderFinderThread:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-leader-finder-thread],
Starting
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471], end
rebalancing consumer
c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471 try #0
14/12/29 08:46:39 INFO VerifiableProperties: Verifying properties
14/12/29 08:46:39 INFO VerifiableProperties: Property client.id is
overridden to c1
14/12/29 08:46:39 INFO VerifiableProperties: Property metadata.broker.list
is overridden to ip-10-0-1-232.us-west-1.compute.internal:9092
14/12/29 08:46:39 INFO VerifiableProperties: Property request.timeout.ms is
overridden to 30000
14/12/29 08:46:39 INFO ClientUtils$: Fetching metadata from broker
id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092 with
correlation id 0 for 1 topic(s) Set(test)
14/12/29 08:46:39 INFO SyncProducer: Connected to
ip-10-0-1-232.us-west-1.compute.internal:9092 for producing
14/12/29 08:46:39 INFO SyncProducer: Disconnecting from
ip-10-0-1-232.us-west-1.compute.internal:9092
14/12/29 08:46:39 INFO ConsumerFetcherThread:
[ConsumerFetcherThread-c1_ip-10-0-1-230.us-west-1.compute.internal-1419860798806-c33be471-0-0],
Starting
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] Added fetcher for partitions
ArrayBuffer([[test,0], initOffset 221 to broker
id:0,host:ip-10-0-1-232.us-west-1.compute.internal,port:9092] )


Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Dec 29, 2014 at 12:43 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Now, Add these lines to get ride of those logs
>
>     import org.apache.log4j.Logger
>     import org.apache.log4j.Level
>
>     Logger.getLogger("org").setLevel(Level.OFF)
>     Logger.getLogger("akka").setLevel(Level.OFF)
>
>
> Thanks
> Best Regards
>
> On Mon, Dec 29, 2014 at 2:09 PM, Suhas Shekar <su...@gmail.com>
> wrote:
>
>> Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it
>> does not stop. When I am not pushing in any data it gives me this:
>>
>> 14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks
>> 14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000 ms
>> 14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks
>> 14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000 ms
>> 14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks
>>
>> When I am pushing in data it does this:
>>
>> 14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200 already
>> exists on this machine; not re-adding it
>> 14/12/29 08:35:08 INFO BlockGenerator: Pushed block input-0-1419860108200
>> 14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with
>> curMem=6515, maxMem=277842493
>> 14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored as
>> bytes in memory (estimated size 80.0 B, free 265.0 MB)
>> 14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200 in
>> memory on ip-10-0-1-230.us-west-1.compute.internal:48171 (size: 80.0 B,
>> free: 265.0 MB)
>> 14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block
>> input-0-1419860109200
>> 14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200 already
>> exists on this machine; not re-adding it
>> 14/12/29 08:35:09 INFO BlockGenerator: Pushed block input-0-1419860109200
>> 14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks
>>
>> I know I am close as everytime I enter a message in my kafka producer,
>> the console reacts as I showed above...do I have to place the
>> awaitTermination somewhere else? Or Is the warning saying there is an
>> underlying problem?
>>
>> Thank you for the help...hopefully I am as close as I think I am!
>>
>>
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> If you want to stop the streaming after 10 seconds, then use
>>> ssc.awaitTermination(10000). Make sure you push some data to kafka for the
>>> streaming to consume within the 10 seconds.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <su...@gmail.com>
>>> wrote:
>>>
>>>> I'm very close! So I added that and then I added this:
>>>> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
>>>>
>>>> and it seems as though the stream is working as it says Stream 0
>>>> received 1 or 2 blocks as I enter in messages on my kafka producer.
>>>> However, the Receiver seems to keep trying every 2 seconds (as I've
>>>> included 2000 in my duration in my java app). How can I stop the Receiver
>>>> from consuming messages after 10 seconds and output the word count to the
>>>> console?
>>>>
>>>> Thanks a lot for all the help! I'm excited to see this word count :)
>>>>
>>>> Suhas Shekar
>>>>
>>>> University of California, Los Angeles
>>>> B.A. Economics, Specialization in Computing 2014
>>>>
>>>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <akhil@sigmoidanalytics.com
>>>> > wrote:
>>>>
>>>>> Add this jar in the dependency
>>>>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Akhil,
>>>>>>
>>>>>> I chanced my Kafka dependency to 2.10 (which is the version of kafka
>>>>>> that was on 10.0.1.232). I am getting a slightly different error, but at
>>>>>> the same place as the previous error (pasted below).
>>>>>>
>>>>>> FYI, when I make these changes to the pom file, I do "mvn clean
>>>>>> package" then cp the new jar files from the repository to my lib of jar
>>>>>> files which is a argument in my spark-submit script which is in my original
>>>>>> post.
>>>>>>
>>>>>> Thanks again for the time and help...much appreciated.
>>>>>>
>>>>>>
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>>>> with group: c1
>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>> 10.0.1.232:2181
>>>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>>>>>> overridden to c1
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>> zookeeper.connect is overridden to 10.0.1.232:2181
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>> zookeeper.connection.timeout.ms is overridden to 10000
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>> com/yammer/metrics/Metrics
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver
>>>>>> 0
>>>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>> com/yammer/metrics/Metrics
>>>>>>         at
>>>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>>>>         at
>>>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>>>>         at
>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>>>>         at
>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>>>>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>>>>         at
>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>>>>         at
>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>         at
>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>         at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>         at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> com.yammer.metrics.Metrics
>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>         ... 18 more
>>>>>>
>>>>>>
>>>>>> Suhas Shekar
>>>>>>
>>>>>> University of California, Los Angeles
>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>
>>>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <suhshekar52@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>>>>> same error.
>>>>>>>
>>>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>>>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>>>>>> don't think that will solve the error as I dont think the application had
>>>>>>> got to level yet.
>>>>>>>
>>>>>>> Please let me know of any possible next steps.
>>>>>>>
>>>>>>> Thank you again for the time and the help!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Suhas Shekar
>>>>>>>
>>>>>>> University of California, Los Angeles
>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>
>>>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <
>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>> Just looked at the pom file that you are using, why are you having
>>>>>>>> different versions in it?
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>>>>> <version>*1.1.1*</version>
>>>>>>>> </dependency>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>>>>> <version>*1.0.2*</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>> ​can you make both the versions the same?​
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <
>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>>>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>>>>>>>>> when I start spark-shell).
>>>>>>>>>
>>>>>>>>> 2) The spark master URL is definitely correct as I have run other
>>>>>>>>> apps with the same script that use Spark (like a word count with a local
>>>>>>>>> file)
>>>>>>>>>
>>>>>>>>> Thank you for the help!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Suhas Shekar
>>>>>>>>>
>>>>>>>>> University of California, Los Angeles
>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>>
>>>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>>
>>>>>>>>>> Make sure you verify the following:
>>>>>>>>>>
>>>>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on
>>>>>>>>>> the webui's top left corner (running on port 8080)
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <
>>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Everyone,
>>>>>>>>>>>
>>>>>>>>>>> Thank you for the time and the help :).
>>>>>>>>>>>
>>>>>>>>>>> My goal here is to get this program working:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>>>>>
>>>>>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>>>>>> pom.xml
>>>>>>>>>>> <
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>>>>>>> running on
>>>>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>>>>
>>>>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>>>>> pom.xml
>>>>>>>>>>> <
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>> Here are a few different approaches I have taken and the issues
>>>>>>>>>>> I run into:
>>>>>>>>>>>
>>>>>>>>>>> *Standalone Mode*
>>>>>>>>>>>
>>>>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars
>>>>>>>>>>> $(echo
>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>>>>
>>>>>>>>>>> Interesting...I was getting an error like this: Initial job has
>>>>>>>>>>> not accepted
>>>>>>>>>>> any resources; check your cluster UI
>>>>>>>>>>>
>>>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements in
>>>>>>>>>>> my code:
>>>>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>>>>> <
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>>>>
>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>>>> Stream with
>>>>>>>>>>> group: c1
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>>>>>> stream 0
>>>>>>>>>>> from akka://sparkDriver
>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>>>> 10.0.1.232:2181
>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing
>>>>>>>>>>> thread
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>>>>>> with
>>>>>>>>>>> message: Error starting receiver 0:
>>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>>>>> onStop
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>>>> receiver 0
>>>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver
>>>>>>>>>>> for stream
>>>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>         at
>>>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>>>>         at
>>>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>>>>         at
>>>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>>>>         at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>>>>> Method)
>>>>>>>>>>>         at
>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>>>>         ... 18 more
>>>>>>>>>>>
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>>>>
>>>>>>>>>>> I ran into a couple other Class not found errors, and was able
>>>>>>>>>>> to solve them
>>>>>>>>>>> by adding dependencies on the pom file, but have not found such
>>>>>>>>>>> a solution
>>>>>>>>>>> to this error.
>>>>>>>>>>>
>>>>>>>>>>> On the Kafka side of things, I am simply typing in messages as
>>>>>>>>>>> soon as I
>>>>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>>>>
>>>>>>>>>>> I have not set up an advertised host on the kafka side as I was
>>>>>>>>>>> able to
>>>>>>>>>>> still receive messages from other consoles by setting up a
>>>>>>>>>>> consumer to
>>>>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>>>>
>>>>>>>>>>> Lastly, is there command, like --from-beginning for a consumer
>>>>>>>>>>> in the java
>>>>>>>>>>> application to get messages from the beginning?
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Now, Add these lines to get ride of those logs

    import org.apache.log4j.Logger
    import org.apache.log4j.Level

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)


Thanks
Best Regards

On Mon, Dec 29, 2014 at 2:09 PM, Suhas Shekar <su...@gmail.com> wrote:

> Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it
> does not stop. When I am not pushing in any data it gives me this:
>
> 14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks
> 14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000 ms
> 14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks
> 14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000 ms
> 14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks
>
> When I am pushing in data it does this:
>
> 14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200 already
> exists on this machine; not re-adding it
> 14/12/29 08:35:08 INFO BlockGenerator: Pushed block input-0-1419860108200
> 14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with
> curMem=6515, maxMem=277842493
> 14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored as
> bytes in memory (estimated size 80.0 B, free 265.0 MB)
> 14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200 in
> memory on ip-10-0-1-230.us-west-1.compute.internal:48171 (size: 80.0 B,
> free: 265.0 MB)
> 14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block
> input-0-1419860109200
> 14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200 already
> exists on this machine; not re-adding it
> 14/12/29 08:35:09 INFO BlockGenerator: Pushed block input-0-1419860109200
> 14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks
>
> I know I am close as everytime I enter a message in my kafka producer, the
> console reacts as I showed above...do I have to place the awaitTermination
> somewhere else? Or Is the warning saying there is an underlying problem?
>
> Thank you for the help...hopefully I am as close as I think I am!
>
>
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> If you want to stop the streaming after 10 seconds, then use
>> ssc.awaitTermination(10000). Make sure you push some data to kafka for the
>> streaming to consume within the 10 seconds.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <su...@gmail.com>
>> wrote:
>>
>>> I'm very close! So I added that and then I added this:
>>> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
>>>
>>> and it seems as though the stream is working as it says Stream 0
>>> received 1 or 2 blocks as I enter in messages on my kafka producer.
>>> However, the Receiver seems to keep trying every 2 seconds (as I've
>>> included 2000 in my duration in my java app). How can I stop the Receiver
>>> from consuming messages after 10 seconds and output the word count to the
>>> console?
>>>
>>> Thanks a lot for all the help! I'm excited to see this word count :)
>>>
>>> Suhas Shekar
>>>
>>> University of California, Los Angeles
>>> B.A. Economics, Specialization in Computing 2014
>>>
>>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Add this jar in the dependency
>>>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Akhil,
>>>>>
>>>>> I chanced my Kafka dependency to 2.10 (which is the version of kafka
>>>>> that was on 10.0.1.232). I am getting a slightly different error, but at
>>>>> the same place as the previous error (pasted below).
>>>>>
>>>>> FYI, when I make these changes to the pom file, I do "mvn clean
>>>>> package" then cp the new jar files from the repository to my lib of jar
>>>>> files which is a argument in my spark-submit script which is in my original
>>>>> post.
>>>>>
>>>>> Thanks again for the time and help...much appreciated.
>>>>>
>>>>>
>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>>> with group: c1
>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>> 10.0.1.232:2181
>>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>>>>> overridden to c1
>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>> zookeeper.connect is overridden to 10.0.1.232:2181
>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>> zookeeper.connection.timeout.ms is overridden to 10000
>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>> com/yammer/metrics/Metrics
>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>> com/yammer/metrics/Metrics
>>>>>         at
>>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>>>         at
>>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>>>         at
>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>>>         at
>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>>>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>>>         at
>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>         at
>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>         at
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>         at
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>         at
>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>         ... 18 more
>>>>>
>>>>>
>>>>> Suhas Shekar
>>>>>
>>>>> University of California, Los Angeles
>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>
>>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>>>> same error.
>>>>>>
>>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>>>>> don't think that will solve the error as I dont think the application had
>>>>>> got to level yet.
>>>>>>
>>>>>> Please let me know of any possible next steps.
>>>>>>
>>>>>> Thank you again for the time and the help!
>>>>>>
>>>>>>
>>>>>>
>>>>>> Suhas Shekar
>>>>>>
>>>>>> University of California, Los Angeles
>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>
>>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <
>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Just looked at the pom file that you are using, why are you having
>>>>>>> different versions in it?
>>>>>>>
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>>>> <version>*1.1.1*</version>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>>>> <version>*1.0.2*</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>> ​can you make both the versions the same?​
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <
>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>
>>>>>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>>>>>>>> when I start spark-shell).
>>>>>>>>
>>>>>>>> 2) The spark master URL is definitely correct as I have run other
>>>>>>>> apps with the same script that use Spark (like a word count with a local
>>>>>>>> file)
>>>>>>>>
>>>>>>>> Thank you for the help!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Suhas Shekar
>>>>>>>>
>>>>>>>> University of California, Los Angeles
>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>
>>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Make sure you verify the following:
>>>>>>>>>
>>>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>>>>>>>> webui's top left corner (running on port 8080)
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <
>>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Everyone,
>>>>>>>>>>
>>>>>>>>>> Thank you for the time and the help :).
>>>>>>>>>>
>>>>>>>>>> My goal here is to get this program working:
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>>>>
>>>>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>>>>> pom.xml
>>>>>>>>>> <
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>>>>>> running on
>>>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>>>
>>>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>>>> pom.xml
>>>>>>>>>> <
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>> Here are a few different approaches I have taken and the issues I
>>>>>>>>>> run into:
>>>>>>>>>>
>>>>>>>>>> *Standalone Mode*
>>>>>>>>>>
>>>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>>>
>>>>>>>>>> Interesting...I was getting an error like this: Initial job has
>>>>>>>>>> not accepted
>>>>>>>>>> any resources; check your cluster UI
>>>>>>>>>>
>>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements in my
>>>>>>>>>> code:
>>>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>>>> <
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>>>
>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>>> Stream with
>>>>>>>>>> group: c1
>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>>>>> stream 0
>>>>>>>>>> from akka://sparkDriver
>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>>> 10.0.1.232:2181
>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing
>>>>>>>>>> thread
>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>>>>> with
>>>>>>>>>> message: Error starting receiver 0:
>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>>>> onStop
>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>>> receiver 0
>>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver
>>>>>>>>>> for stream
>>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>         at
>>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>>>         at
>>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>>>         at
>>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>>>         at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>>>> Method)
>>>>>>>>>>         at
>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>>>         ... 18 more
>>>>>>>>>>
>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>>>
>>>>>>>>>> I ran into a couple other Class not found errors, and was able to
>>>>>>>>>> solve them
>>>>>>>>>> by adding dependencies on the pom file, but have not found such a
>>>>>>>>>> solution
>>>>>>>>>> to this error.
>>>>>>>>>>
>>>>>>>>>> On the Kafka side of things, I am simply typing in messages as
>>>>>>>>>> soon as I
>>>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>>>
>>>>>>>>>> I have not set up an advertised host on the kafka side as I was
>>>>>>>>>> able to
>>>>>>>>>> still receive messages from other consoles by setting up a
>>>>>>>>>> consumer to
>>>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>>>
>>>>>>>>>> Lastly, is there command, like --from-beginning for a consumer in
>>>>>>>>>> the java
>>>>>>>>>> application to get messages from the beginning?
>>>>>>>>>>
>>>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it does
not stop. When I am not pushing in any data it gives me this:

14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks
14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000 ms
14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks
14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000 ms
14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks

When I am pushing in data it does this:

14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200 already
exists on this machine; not re-adding it
14/12/29 08:35:08 INFO BlockGenerator: Pushed block input-0-1419860108200
14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with
curMem=6515, maxMem=277842493
14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored as
bytes in memory (estimated size 80.0 B, free 265.0 MB)
14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200 in
memory on ip-10-0-1-230.us-west-1.compute.internal:48171 (size: 80.0 B,
free: 265.0 MB)
14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block
input-0-1419860109200
14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200 already
exists on this machine; not re-adding it
14/12/29 08:35:09 INFO BlockGenerator: Pushed block input-0-1419860109200
14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks

I know I am close as everytime I enter a message in my kafka producer, the
console reacts as I showed above...do I have to place the awaitTermination
somewhere else? Or Is the warning saying there is an underlying problem?

Thank you for the help...hopefully I am as close as I think I am!



Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> If you want to stop the streaming after 10 seconds, then use
> ssc.awaitTermination(10000). Make sure you push some data to kafka for the
> streaming to consume within the 10 seconds.
>
> Thanks
> Best Regards
>
> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <su...@gmail.com>
> wrote:
>
>> I'm very close! So I added that and then I added this:
>> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
>>
>> and it seems as though the stream is working as it says Stream 0 received
>> 1 or 2 blocks as I enter in messages on my kafka producer. However, the
>> Receiver seems to keep trying every 2 seconds (as I've included 2000 in my
>> duration in my java app). How can I stop the Receiver from consuming
>> messages after 10 seconds and output the word count to the console?
>>
>> Thanks a lot for all the help! I'm excited to see this word count :)
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Add this jar in the dependency
>>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <su...@gmail.com>
>>> wrote:
>>>
>>>> Hello Akhil,
>>>>
>>>> I chanced my Kafka dependency to 2.10 (which is the version of kafka
>>>> that was on 10.0.1.232). I am getting a slightly different error, but at
>>>> the same place as the previous error (pasted below).
>>>>
>>>> FYI, when I make these changes to the pom file, I do "mvn clean
>>>> package" then cp the new jar files from the repository to my lib of jar
>>>> files which is a argument in my spark-submit script which is in my original
>>>> post.
>>>>
>>>> Thanks again for the time and help...much appreciated.
>>>>
>>>>
>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>> with group: c1
>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>>> 10.0.1.232:2181
>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>>>> overridden to c1
>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect
>>>> is overridden to 10.0.1.232:2181
>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>> zookeeper.connection.timeout.ms is overridden to 10000
>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>> com/yammer/metrics/Metrics
>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>> com/yammer/metrics/Metrics
>>>>         at
>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>>         at
>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>>         at
>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>>         at
>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>>         at
>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>         at
>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>         at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>         at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>         at java.lang.Thread.run(Thread.java:722)
>>>> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>         ... 18 more
>>>>
>>>>
>>>> Suhas Shekar
>>>>
>>>> University of California, Los Angeles
>>>> B.A. Economics, Specialization in Computing 2014
>>>>
>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>>> same error.
>>>>>
>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>>>> don't think that will solve the error as I dont think the application had
>>>>> got to level yet.
>>>>>
>>>>> Please let me know of any possible next steps.
>>>>>
>>>>> Thank you again for the time and the help!
>>>>>
>>>>>
>>>>>
>>>>> Suhas Shekar
>>>>>
>>>>> University of California, Los Angeles
>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>
>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <
>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>
>>>>>> Just looked at the pom file that you are using, why are you having
>>>>>> different versions in it?
>>>>>>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.spark</groupId>
>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>>> <version>*1.1.1*</version>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>org.apache.spark</groupId>
>>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>>> <version>*1.0.2*</version>
>>>>>> </dependency>
>>>>>>
>>>>>> ​can you make both the versions the same?​
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <suhshekar52@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>>>>>>> when I start spark-shell).
>>>>>>>
>>>>>>> 2) The spark master URL is definitely correct as I have run other
>>>>>>> apps with the same script that use Spark (like a word count with a local
>>>>>>> file)
>>>>>>>
>>>>>>> Thank you for the help!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Suhas Shekar
>>>>>>>
>>>>>>> University of California, Los Angeles
>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>
>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>> Make sure you verify the following:
>>>>>>>>
>>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>>>>>>> webui's top left corner (running on port 8080)
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <
>>>>>>>> suhshekar52@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello Everyone,
>>>>>>>>>
>>>>>>>>> Thank you for the time and the help :).
>>>>>>>>>
>>>>>>>>> My goal here is to get this program working:
>>>>>>>>>
>>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>>>
>>>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>>>> pom.xml
>>>>>>>>> <
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>>>>> running on
>>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>>
>>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>>> pom.xml
>>>>>>>>> <
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>> Here are a few different approaches I have taken and the issues I
>>>>>>>>> run into:
>>>>>>>>>
>>>>>>>>> *Standalone Mode*
>>>>>>>>>
>>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>>
>>>>>>>>> Interesting...I was getting an error like this: Initial job has
>>>>>>>>> not accepted
>>>>>>>>> any resources; check your cluster UI
>>>>>>>>>
>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements in my
>>>>>>>>> code:
>>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>>> <
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>>
>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>> Stream with
>>>>>>>>> group: c1
>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>>>> stream 0
>>>>>>>>> from akka://sparkDriver
>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>> 10.0.1.232:2181
>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>>>> with
>>>>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>>> onStop
>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>> receiver 0
>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver
>>>>>>>>> for stream
>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>         at
>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>>         at
>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>>         at
>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>>> Method)
>>>>>>>>>         at
>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>>         ... 18 more
>>>>>>>>>
>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>>
>>>>>>>>> I ran into a couple other Class not found errors, and was able to
>>>>>>>>> solve them
>>>>>>>>> by adding dependencies on the pom file, but have not found such a
>>>>>>>>> solution
>>>>>>>>> to this error.
>>>>>>>>>
>>>>>>>>> On the Kafka side of things, I am simply typing in messages as
>>>>>>>>> soon as I
>>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>>
>>>>>>>>> I have not set up an advertised host on the kafka side as I was
>>>>>>>>> able to
>>>>>>>>> still receive messages from other consoles by setting up a
>>>>>>>>> consumer to
>>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>>
>>>>>>>>> Lastly, is there command, like --from-beginning for a consumer in
>>>>>>>>> the java
>>>>>>>>> application to get messages from the beginning?
>>>>>>>>>
>>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context:
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
If you want to stop the streaming after 10 seconds, then use
ssc.awaitTermination(10000). Make sure you push some data to kafka for the
streaming to consume within the 10 seconds.

Thanks
Best Regards

On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <su...@gmail.com> wrote:

> I'm very close! So I added that and then I added this:
> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
>
> and it seems as though the stream is working as it says Stream 0 received
> 1 or 2 blocks as I enter in messages on my kafka producer. However, the
> Receiver seems to keep trying every 2 seconds (as I've included 2000 in my
> duration in my java app). How can I stop the Receiver from consuming
> messages after 10 seconds and output the word count to the console?
>
> Thanks a lot for all the help! I'm excited to see this word count :)
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Add this jar in the dependency
>> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <su...@gmail.com>
>> wrote:
>>
>>> Hello Akhil,
>>>
>>> I chanced my Kafka dependency to 2.10 (which is the version of kafka
>>> that was on 10.0.1.232). I am getting a slightly different error, but at
>>> the same place as the previous error (pasted below).
>>>
>>> FYI, when I make these changes to the pom file, I do "mvn clean package"
>>> then cp the new jar files from the repository to my lib of jar files which
>>> is a argument in my spark-submit script which is in my original post.
>>>
>>> Thanks again for the time and help...much appreciated.
>>>
>>>
>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>> with group: c1
>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>> 10.0.1.232:2181
>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>>> overridden to c1
>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect
>>> is overridden to 10.0.1.232:2181
>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>> zookeeper.connection.timeout.ms is overridden to 10000
>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>> com/yammer/metrics/Metrics
>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> com/yammer/metrics/Metrics
>>>         at
>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>         at
>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>         at
>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>         at
>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>         at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>         at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>         at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>         at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>         at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>         at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>         at java.lang.Thread.run(Thread.java:722)
>>> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>         ... 18 more
>>>
>>>
>>> Suhas Shekar
>>>
>>> University of California, Los Angeles
>>> B.A. Economics, Specialization in Computing 2014
>>>
>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <su...@gmail.com>
>>> wrote:
>>>
>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>> same error.
>>>>
>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>>> don't think that will solve the error as I dont think the application had
>>>> got to level yet.
>>>>
>>>> Please let me know of any possible next steps.
>>>>
>>>> Thank you again for the time and the help!
>>>>
>>>>
>>>>
>>>> Suhas Shekar
>>>>
>>>> University of California, Los Angeles
>>>> B.A. Economics, Specialization in Computing 2014
>>>>
>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <akhil@sigmoidanalytics.com
>>>> > wrote:
>>>>
>>>>> Just looked at the pom file that you are using, why are you having
>>>>> different versions in it?
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.spark</groupId>
>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>> <version>*1.1.1*</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.spark</groupId>
>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>> <version>*1.0.2*</version>
>>>>> </dependency>
>>>>>
>>>>> ​can you make both the versions the same?​
>>>>>
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>>>>>> when I start spark-shell).
>>>>>>
>>>>>> 2) The spark master URL is definitely correct as I have run other
>>>>>> apps with the same script that use Spark (like a word count with a local
>>>>>> file)
>>>>>>
>>>>>> Thank you for the help!
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Suhas Shekar
>>>>>>
>>>>>> University of California, Los Angeles
>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>
>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Make sure you verify the following:
>>>>>>>
>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>>>>>> webui's top left corner (running on port 8080)
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <suhshekar52@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hello Everyone,
>>>>>>>>
>>>>>>>> Thank you for the time and the help :).
>>>>>>>>
>>>>>>>> My goal here is to get this program working:
>>>>>>>>
>>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>>
>>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>>> pom.xml
>>>>>>>> <
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>> >
>>>>>>>>
>>>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>>>> running on
>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>
>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>> pom.xml
>>>>>>>> <
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>>> >
>>>>>>>>
>>>>>>>> Here are a few different approaches I have taken and the issues I
>>>>>>>> run into:
>>>>>>>>
>>>>>>>> *Standalone Mode*
>>>>>>>>
>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>
>>>>>>>>
>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>
>>>>>>>> Interesting...I was getting an error like this: Initial job has not
>>>>>>>> accepted
>>>>>>>> any resources; check your cluster UI
>>>>>>>>
>>>>>>>> Now, when I run, it prints out the 3 Hello world statements in my
>>>>>>>> code:
>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>> <
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>>> >
>>>>>>>>
>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>
>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>> Stream with
>>>>>>>> group: c1
>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>>> stream 0
>>>>>>>> from akka://sparkDriver
>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>> 10.0.1.232:2181
>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>>> with
>>>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>>>> scala/reflect/ClassManifest
>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>> onStop
>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>> receiver 0
>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver
>>>>>>>> for stream
>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>         at
>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>         at
>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>         at
>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>         at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>>         at
>>>>>>>>
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>>         at java.security.AccessController.doPrivileged(Native
>>>>>>>> Method)
>>>>>>>>         at
>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>>         ... 18 more
>>>>>>>>
>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>
>>>>>>>> I ran into a couple other Class not found errors, and was able to
>>>>>>>> solve them
>>>>>>>> by adding dependencies on the pom file, but have not found such a
>>>>>>>> solution
>>>>>>>> to this error.
>>>>>>>>
>>>>>>>> On the Kafka side of things, I am simply typing in messages as soon
>>>>>>>> as I
>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>
>>>>>>>> I have not set up an advertised host on the kafka side as I was
>>>>>>>> able to
>>>>>>>> still receive messages from other consoles by setting up a consumer
>>>>>>>> to
>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>
>>>>>>>> Lastly, is there command, like --from-beginning for a consumer in
>>>>>>>> the java
>>>>>>>> application to get messages from the beginning?
>>>>>>>>
>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
I'm very close! So I added that and then I added this:
http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta

and it seems as though the stream is working as it says Stream 0 received 1
or 2 blocks as I enter in messages on my kafka producer. However, the
Receiver seems to keep trying every 2 seconds (as I've included 2000 in my
duration in my java app). How can I stop the Receiver from consuming
messages after 10 seconds and output the word count to the console?

Thanks a lot for all the help! I'm excited to see this word count :)

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Add this jar in the dependency
> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>
> Thanks
> Best Regards
>
> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <su...@gmail.com>
> wrote:
>
>> Hello Akhil,
>>
>> I chanced my Kafka dependency to 2.10 (which is the version of kafka that
>> was on 10.0.1.232). I am getting a slightly different error, but at the
>> same place as the previous error (pasted below).
>>
>> FYI, when I make these changes to the pom file, I do "mvn clean package"
>> then cp the new jar files from the repository to my lib of jar files which
>> is a argument in my spark-submit script which is in my original post.
>>
>> Thanks again for the time and help...much appreciated.
>>
>>
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream with
>> group: c1
>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>> 10.0.1.232:2181
>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>> overridden to c1
>> 14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect
>> is overridden to 10.0.1.232:2181
>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>> zookeeper.connection.timeout.ms is overridden to 10000
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>> com/yammer/metrics/Metrics
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for stream
>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>> com/yammer/metrics/Metrics
>>         at
>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>         at
>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>         at
>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>         at
>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>         at
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>         at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>         at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>         at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>         at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>         at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>         ... 18 more
>>
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <su...@gmail.com>
>> wrote:
>>
>>> I made both versions 1.1.1 and I got the same error. I then tried making
>>> both 1.1.0 as that is the version of my Spark Core, but I got the same
>>> error.
>>>
>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>> don't think that will solve the error as I dont think the application had
>>> got to level yet.
>>>
>>> Please let me know of any possible next steps.
>>>
>>> Thank you again for the time and the help!
>>>
>>>
>>>
>>> Suhas Shekar
>>>
>>> University of California, Los Angeles
>>> B.A. Economics, Specialization in Computing 2014
>>>
>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Just looked at the pom file that you are using, why are you having
>>>> different versions in it?
>>>>
>>>> <dependency>
>>>> <groupId>org.apache.spark</groupId>
>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>> <version>*1.1.1*</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.spark</groupId>
>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>> <version>*1.0.2*</version>
>>>> </dependency>
>>>>
>>>> ​can you make both the versions the same?​
>>>>
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>>>>> when I start spark-shell).
>>>>>
>>>>> 2) The spark master URL is definitely correct as I have run other apps
>>>>> with the same script that use Spark (like a word count with a local file)
>>>>>
>>>>> Thank you for the help!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Suhas Shekar
>>>>>
>>>>> University of California, Los Angeles
>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>
>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>
>>>>>> Make sure you verify the following:
>>>>>>
>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>>>>> webui's top left corner (running on port 8080)
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <su...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Everyone,
>>>>>>>
>>>>>>> Thank you for the time and the help :).
>>>>>>>
>>>>>>> My goal here is to get this program working:
>>>>>>>
>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>
>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>> pom.xml
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>> >
>>>>>>>
>>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>>> running on
>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>
>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>> pom.xml
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>> >
>>>>>>>
>>>>>>> Here are a few different approaches I have taken and the issues I
>>>>>>> run into:
>>>>>>>
>>>>>>> *Standalone Mode*
>>>>>>>
>>>>>>> 1) Use spark-submit script to run:
>>>>>>>
>>>>>>>
>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>
>>>>>>> Interesting...I was getting an error like this: Initial job has not
>>>>>>> accepted
>>>>>>> any resources; check your cluster UI
>>>>>>>
>>>>>>> Now, when I run, it prints out the 3 Hello world statements in my
>>>>>>> code:
>>>>>>> KafkaJavaConsumer.txt
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>> >
>>>>>>>
>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>
>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>>>>> with
>>>>>>> group: c1
>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>> stream 0
>>>>>>> from akka://sparkDriver
>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>> 10.0.1.232:2181
>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>>> scala/reflect/ClassManifest
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>> receiver 0
>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for
>>>>>>> stream
>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>> scala/reflect/ClassManifest
>>>>>>>         at
>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>         at
>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>         at
>>>>>>>
>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>         at
>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>> scala.reflect.ClassManifest
>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>         ... 18 more
>>>>>>>
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>
>>>>>>> I ran into a couple other Class not found errors, and was able to
>>>>>>> solve them
>>>>>>> by adding dependencies on the pom file, but have not found such a
>>>>>>> solution
>>>>>>> to this error.
>>>>>>>
>>>>>>> On the Kafka side of things, I am simply typing in messages as soon
>>>>>>> as I
>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>
>>>>>>> I have not set up an advertised host on the kafka side as I was able
>>>>>>> to
>>>>>>> still receive messages from other consoles by setting up a consumer
>>>>>>> to
>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>
>>>>>>> Lastly, is there command, like --from-beginning for a consumer in
>>>>>>> the java
>>>>>>> application to get messages from the beginning?
>>>>>>>
>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Add this jar in the dependency
http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0

Thanks
Best Regards

On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <su...@gmail.com> wrote:

> Hello Akhil,
>
> I chanced my Kafka dependency to 2.10 (which is the version of kafka that
> was on 10.0.1.232). I am getting a slightly different error, but at the
> same place as the previous error (pasted below).
>
> FYI, when I make these changes to the pom file, I do "mvn clean package"
> then cp the new jar files from the repository to my lib of jar files which
> is a argument in my spark-submit script which is in my original post.
>
> Thanks again for the time and help...much appreciated.
>
>
> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream with
> group: c1
> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
> 10.0.1.232:2181
> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
> overridden to c1
> 14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to 10.0.1.232:2181
> 14/12/29 07:56:00 INFO VerifiableProperties: Property
> zookeeper.connection.timeout.ms is overridden to 10000
> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
> com/yammer/metrics/Metrics
> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0
> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
> com/yammer/metrics/Metrics
>         at
> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>         at
> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>         at
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>         at
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>         at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>         at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>         at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>         at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>         at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>         at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>         ... 18 more
>
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <su...@gmail.com>
> wrote:
>
>> I made both versions 1.1.1 and I got the same error. I then tried making
>> both 1.1.0 as that is the version of my Spark Core, but I got the same
>> error.
>>
>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>> don't think that will solve the error as I dont think the application had
>> got to level yet.
>>
>> Please let me know of any possible next steps.
>>
>> Thank you again for the time and the help!
>>
>>
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Just looked at the pom file that you are using, why are you having
>>> different versions in it?
>>>
>>> <dependency>
>>> <groupId>org.apache.spark</groupId>
>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>> <version>*1.1.1*</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.spark</groupId>
>>> <artifactId>spark-streaming_2.10</artifactId>
>>> <version>*1.0.2*</version>
>>> </dependency>
>>>
>>> ​can you make both the versions the same?​
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <su...@gmail.com>
>>> wrote:
>>>
>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>>>> when I start spark-shell).
>>>>
>>>> 2) The spark master URL is definitely correct as I have run other apps
>>>> with the same script that use Spark (like a word count with a local file)
>>>>
>>>> Thank you for the help!
>>>>
>>>>
>>>>
>>>>
>>>> Suhas Shekar
>>>>
>>>> University of California, Los Angeles
>>>> B.A. Economics, Specialization in Computing 2014
>>>>
>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <akhil@sigmoidanalytics.com
>>>> > wrote:
>>>>
>>>>> Make sure you verify the following:
>>>>>
>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>>>> webui's top left corner (running on port 8080)
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Everyone,
>>>>>>
>>>>>> Thank you for the time and the help :).
>>>>>>
>>>>>> My goal here is to get this program working:
>>>>>>
>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>
>>>>>> The only lines I do not have from the example are lines 62-67. pom.xml
>>>>>> <
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>> >
>>>>>>
>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>> running on
>>>>>> top of Cloudera Manager 5.2.
>>>>>>
>>>>>> Pom file is attached and the same for both clusters.
>>>>>> pom.xml
>>>>>> <
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>> >
>>>>>>
>>>>>> Here are a few different approaches I have taken and the issues I run
>>>>>> into:
>>>>>>
>>>>>> *Standalone Mode*
>>>>>>
>>>>>> 1) Use spark-submit script to run:
>>>>>>
>>>>>>
>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>
>>>>>> Interesting...I was getting an error like this: Initial job has not
>>>>>> accepted
>>>>>> any resources; check your cluster UI
>>>>>>
>>>>>> Now, when I run, it prints out the 3 Hello world statements in my
>>>>>> code:
>>>>>> KafkaJavaConsumer.txt
>>>>>> <
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>> >
>>>>>>
>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>
>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>>>> with
>>>>>> group: c1
>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>> stream 0
>>>>>> from akka://sparkDriver
>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>> 10.0.1.232:2181
>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>> scala/reflect/ClassManifest
>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering receiver
>>>>>> 0
>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for
>>>>>> stream
>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>> scala/reflect/ClassManifest
>>>>>>         at
>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>         at
>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>         at
>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>         at
>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>         at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>         at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> scala.reflect.ClassManifest
>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>         ... 18 more
>>>>>>
>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>
>>>>>> I ran into a couple other Class not found errors, and was able to
>>>>>> solve them
>>>>>> by adding dependencies on the pom file, but have not found such a
>>>>>> solution
>>>>>> to this error.
>>>>>>
>>>>>> On the Kafka side of things, I am simply typing in messages as soon
>>>>>> as I
>>>>>> start the Java app on another console. Is this okay?
>>>>>>
>>>>>> I have not set up an advertised host on the kafka side as I was able
>>>>>> to
>>>>>> still receive messages from other consoles by setting up a consumer to
>>>>>> listen to the private ip:port. Is this okay?
>>>>>>
>>>>>> Lastly, is there command, like --from-beginning for a consumer in the
>>>>>> java
>>>>>> application to get messages from the beginning?
>>>>>>
>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
Hello Akhil,

I chanced my Kafka dependency to 2.10 (which is the version of kafka that
was on 10.0.1.232). I am getting a slightly different error, but at the
same place as the previous error (pasted below).

FYI, when I make these changes to the pom file, I do "mvn clean package"
then cp the new jar files from the repository to my lib of jar files which
is a argument in my spark-submit script which is in my original post.

Thanks again for the time and help...much appreciated.


14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream with
group: c1
14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
10.0.1.232:2181
14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
overridden to c1
14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect is
overridden to 10.0.1.232:2181
14/12/29 07:56:00 INFO VerifiableProperties: Property
zookeeper.connection.timeout.ms is overridden to 10000
14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
message: Error starting receiver 0: java.lang.NoClassDefFoundError:
com/yammer/metrics/Metrics
14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0
14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for stream
0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
com/yammer/metrics/Metrics
        at
kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
        at
kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
        at
kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
        at
kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
        at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
        at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
        at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
        at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
        at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
        ... 18 more


Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <su...@gmail.com>
wrote:

> I made both versions 1.1.1 and I got the same error. I then tried making
> both 1.1.0 as that is the version of my Spark Core, but I got the same
> error.
>
> I noticed my Kafka dependency is for scala 2.9.2, while my spark streaming
> kafka dependency is 2.10.x...I will try changing that next, but don't think
> that will solve the error as I dont think the application had got to level
> yet.
>
> Please let me know of any possible next steps.
>
> Thank you again for the time and the help!
>
>
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Just looked at the pom file that you are using, why are you having
>> different versions in it?
>>
>> <dependency>
>> <groupId>org.apache.spark</groupId>
>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>> <version>*1.1.1*</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.spark</groupId>
>> <artifactId>spark-streaming_2.10</artifactId>
>> <version>*1.0.2*</version>
>> </dependency>
>>
>> ​can you make both the versions the same?​
>>
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <su...@gmail.com>
>> wrote:
>>
>>> 1) Could you please clarify on what you mean by checking the Scala
>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>>> when I start spark-shell).
>>>
>>> 2) The spark master URL is definitely correct as I have run other apps
>>> with the same script that use Spark (like a word count with a local file)
>>>
>>> Thank you for the help!
>>>
>>>
>>>
>>>
>>> Suhas Shekar
>>>
>>> University of California, Los Angeles
>>> B.A. Economics, Specialization in Computing 2014
>>>
>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Make sure you verify the following:
>>>>
>>>> - Scala version : I think the correct version would be 2.10.x
>>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>>> webui's top left corner (running on port 8080)
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <su...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Everyone,
>>>>>
>>>>> Thank you for the time and the help :).
>>>>>
>>>>> My goal here is to get this program working:
>>>>>
>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>
>>>>> The only lines I do not have from the example are lines 62-67. pom.xml
>>>>> <
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>> >
>>>>>
>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>> running on
>>>>> top of Cloudera Manager 5.2.
>>>>>
>>>>> Pom file is attached and the same for both clusters.
>>>>> pom.xml
>>>>> <
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>> >
>>>>>
>>>>> Here are a few different approaches I have taken and the issues I run
>>>>> into:
>>>>>
>>>>> *Standalone Mode*
>>>>>
>>>>> 1) Use spark-submit script to run:
>>>>>
>>>>>
>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>
>>>>> Interesting...I was getting an error like this: Initial job has not
>>>>> accepted
>>>>> any resources; check your cluster UI
>>>>>
>>>>> Now, when I run, it prints out the 3 Hello world statements in my code:
>>>>> KafkaJavaConsumer.txt
>>>>> <
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>> >
>>>>>
>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>
>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>>> with
>>>>> group: c1
>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for stream
>>>>> 0
>>>>> from akka://sparkDriver
>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>> 10.0.1.232:2181
>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>> scala/reflect/ClassManifest
>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for
>>>>> stream
>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>> scala/reflect/ClassManifest
>>>>>         at
>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>         at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>         at
>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>         at
>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>         at
>>>>>
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>         at
>>>>>
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>         at
>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>         at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>         at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> scala.reflect.ClassManifest
>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>         ... 18 more
>>>>>
>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>
>>>>> I ran into a couple other Class not found errors, and was able to
>>>>> solve them
>>>>> by adding dependencies on the pom file, but have not found such a
>>>>> solution
>>>>> to this error.
>>>>>
>>>>> On the Kafka side of things, I am simply typing in messages as soon as
>>>>> I
>>>>> start the Java app on another console. Is this okay?
>>>>>
>>>>> I have not set up an advertised host on the kafka side as I was able to
>>>>> still receive messages from other consoles by setting up a consumer to
>>>>> listen to the private ip:port. Is this okay?
>>>>>
>>>>> Lastly, is there command, like --from-beginning for a consumer in the
>>>>> java
>>>>> application to get messages from the beginning?
>>>>>
>>>>> Thanks a lot for the help and happy holidays!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
I made both versions 1.1.1 and I got the same error. I then tried making
both 1.1.0 as that is the version of my Spark Core, but I got the same
error.

I noticed my Kafka dependency is for scala 2.9.2, while my spark streaming
kafka dependency is 2.10.x...I will try changing that next, but don't think
that will solve the error as I dont think the application had got to level
yet.

Please let me know of any possible next steps.

Thank you again for the time and the help!



Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Just looked at the pom file that you are using, why are you having
> different versions in it?
>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming-kafka_2.10</artifactId>
> <version>*1.1.1*</version>
> </dependency>
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming_2.10</artifactId>
> <version>*1.0.2*</version>
> </dependency>
>
> ​can you make both the versions the same?​
>
>
> Thanks
> Best Regards
>
> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <su...@gmail.com>
> wrote:
>
>> 1) Could you please clarify on what you mean by checking the Scala
>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as
>> when I start spark-shell).
>>
>> 2) The spark master URL is definitely correct as I have run other apps
>> with the same script that use Spark (like a word count with a local file)
>>
>> Thank you for the help!
>>
>>
>>
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Make sure you verify the following:
>>>
>>> - Scala version : I think the correct version would be 2.10.x
>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>> webui's top left corner (running on port 8080)
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <su...@gmail.com>
>>> wrote:
>>>
>>>> Hello Everyone,
>>>>
>>>> Thank you for the time and the help :).
>>>>
>>>> My goal here is to get this program working:
>>>>
>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>
>>>> The only lines I do not have from the example are lines 62-67. pom.xml
>>>> <
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>> >
>>>>
>>>> Background: Have ec2 instances running. The standalone spark is running
>>>> on
>>>> top of Cloudera Manager 5.2.
>>>>
>>>> Pom file is attached and the same for both clusters.
>>>> pom.xml
>>>> <
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>> >
>>>>
>>>> Here are a few different approaches I have taken and the issues I run
>>>> into:
>>>>
>>>> *Standalone Mode*
>>>>
>>>> 1) Use spark-submit script to run:
>>>>
>>>>
>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>
>>>> Interesting...I was getting an error like this: Initial job has not
>>>> accepted
>>>> any resources; check your cluster UI
>>>>
>>>> Now, when I run, it prints out the 3 Hello world statements in my code:
>>>> KafkaJavaConsumer.txt
>>>> <
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>> >
>>>>
>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>
>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>> with
>>>> group: c1
>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for stream 0
>>>> from akka://sparkDriver
>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>> 10.0.1.232:2181
>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>> scala/reflect/ClassManifest
>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for
>>>> stream
>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>> scala/reflect/ClassManifest
>>>>         at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>         at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>         at
>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>         at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>         at
>>>>
>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>         at
>>>>
>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>         at java.lang.Thread.run(Thread.java:722)
>>>> Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>         ... 18 more
>>>>
>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>
>>>> I ran into a couple other Class not found errors, and was able to solve
>>>> them
>>>> by adding dependencies on the pom file, but have not found such a
>>>> solution
>>>> to this error.
>>>>
>>>> On the Kafka side of things, I am simply typing in messages as soon as I
>>>> start the Java app on another console. Is this okay?
>>>>
>>>> I have not set up an advertised host on the kafka side as I was able to
>>>> still receive messages from other consoles by setting up a consumer to
>>>> listen to the private ip:port. Is this okay?
>>>>
>>>> Lastly, is there command, like --from-beginning for a consumer in the
>>>> java
>>>> application to get messages from the beginning?
>>>>
>>>> Thanks a lot for the help and happy holidays!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Just looked at the pom file that you are using, why are you having
different versions in it?

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>*1.1.1*</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>*1.0.2*</version>
</dependency>

​can you make both the versions the same?​


Thanks
Best Regards

On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <su...@gmail.com>
wrote:

> 1) Could you please clarify on what you mean by checking the Scala version
> is correct? In my pom.xml file it is 2.10.4 (which is the same as when I
> start spark-shell).
>
> 2) The spark master URL is definitely correct as I have run other apps
> with the same script that use Spark (like a word count with a local file)
>
> Thank you for the help!
>
>
>
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Make sure you verify the following:
>>
>> - Scala version : I think the correct version would be 2.10.x
>> - SparkMasterURL: Be sure that you copied the one displayed on the
>> webui's top left corner (running on port 8080)
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <su...@gmail.com>
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> Thank you for the time and the help :).
>>>
>>> My goal here is to get this program working:
>>>
>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>
>>> The only lines I do not have from the example are lines 62-67. pom.xml
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>> >
>>>
>>> Background: Have ec2 instances running. The standalone spark is running
>>> on
>>> top of Cloudera Manager 5.2.
>>>
>>> Pom file is attached and the same for both clusters.
>>> pom.xml
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>> >
>>>
>>> Here are a few different approaches I have taken and the issues I run
>>> into:
>>>
>>> *Standalone Mode*
>>>
>>> 1) Use spark-submit script to run:
>>>
>>>
>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>
>>> Interesting...I was getting an error like this: Initial job has not
>>> accepted
>>> any resources; check your cluster UI
>>>
>>> Now, when I run, it prints out the 3 Hello world statements in my code:
>>> KafkaJavaConsumer.txt
>>> <
>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>> >
>>>
>>> and then it seems to try to start the Kafka Stream, but fails:
>>>
>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream with
>>> group: c1
>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for stream 0
>>> from akka://sparkDriver
>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>> 10.0.1.232:2181
>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>> scala/reflect/ClassManifest
>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for
>>> stream
>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> scala/reflect/ClassManifest
>>>         at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>         at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>         at
>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>         at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>         at
>>>
>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>         at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>         at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>         at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>         at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>         at
>>>
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>         at
>>>
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>         at java.lang.Thread.run(Thread.java:722)
>>> Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>         ... 18 more
>>>
>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>
>>> I ran into a couple other Class not found errors, and was able to solve
>>> them
>>> by adding dependencies on the pom file, but have not found such a
>>> solution
>>> to this error.
>>>
>>> On the Kafka side of things, I am simply typing in messages as soon as I
>>> start the Java app on another console. Is this okay?
>>>
>>> I have not set up an advertised host on the kafka side as I was able to
>>> still receive messages from other consoles by setting up a consumer to
>>> listen to the private ip:port. Is this okay?
>>>
>>> Lastly, is there command, like --from-beginning for a consumer in the
>>> java
>>> application to get messages from the beginning?
>>>
>>> Thanks a lot for the help and happy holidays!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Suhas Shekar <su...@gmail.com>.
1) Could you please clarify on what you mean by checking the Scala version
is correct? In my pom.xml file it is 2.10.4 (which is the same as when I
start spark-shell).

2) The spark master URL is definitely correct as I have run other apps with
the same script that use Spark (like a word count with a local file)

Thank you for the help!




Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Make sure you verify the following:
>
> - Scala version : I think the correct version would be 2.10.x
> - SparkMasterURL: Be sure that you copied the one displayed on the webui's
> top left corner (running on port 8080)
>
> Thanks
> Best Regards
>
> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <su...@gmail.com>
> wrote:
>
>> Hello Everyone,
>>
>> Thank you for the time and the help :).
>>
>> My goal here is to get this program working:
>>
>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>
>> The only lines I do not have from the example are lines 62-67. pom.xml
>> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml>
>>
>> Background: Have ec2 instances running. The standalone spark is running on
>> top of Cloudera Manager 5.2.
>>
>> Pom file is attached and the same for both clusters.
>> pom.xml
>> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml>
>>
>> Here are a few different approaches I have taken and the issues I run
>> into:
>>
>> *Standalone Mode*
>>
>> 1) Use spark-submit script to run:
>>
>>
>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>
>> Interesting...I was getting an error like this: Initial job has not
>> accepted
>> any resources; check your cluster UI
>>
>> Now, when I run, it prints out the 3 Hello world statements in my code:
>> KafkaJavaConsumer.txt
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>> >
>>
>> and then it seems to try to start the Kafka Stream, but fails:
>>
>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream with
>> group: c1
>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for stream 0
>> from akka://sparkDriver
>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>> 10.0.1.232:2181
>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>> scala/reflect/ClassManifest
>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for
>> stream
>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>> scala/reflect/ClassManifest
>>         at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>         at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>         at
>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>         at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>         at
>>
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>         at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>         at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>         at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>         at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>         at
>>
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         at
>>
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>         ... 18 more
>>
>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>
>> I ran into a couple other Class not found errors, and was able to solve
>> them
>> by adding dependencies on the pom file, but have not found such a solution
>> to this error.
>>
>> On the Kafka side of things, I am simply typing in messages as soon as I
>> start the Java app on another console. Is this okay?
>>
>> I have not set up an advertised host on the kafka side as I was able to
>> still receive messages from other consoles by setting up a consumer to
>> listen to the private ip:port. Is this okay?
>>
>> Lastly, is there command, like --from-beginning for a consumer in the java
>> application to get messages from the beginning?
>>
>> Thanks a lot for the help and happy holidays!
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Setting up Simple Kafka Consumer via Spark Java app

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Make sure you verify the following:

- Scala version : I think the correct version would be 2.10.x
- SparkMasterURL: Be sure that you copied the one displayed on the webui's
top left corner (running on port 8080)

Thanks
Best Regards

On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <su...@gmail.com> wrote:

> Hello Everyone,
>
> Thank you for the time and the help :).
>
> My goal here is to get this program working:
>
> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>
> The only lines I do not have from the example are lines 62-67. pom.xml
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml>
>
> Background: Have ec2 instances running. The standalone spark is running on
> top of Cloudera Manager 5.2.
>
> Pom file is attached and the same for both clusters.
> pom.xml
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml>
>
> Here are a few different approaches I have taken and the issues I run into:
>
> *Standalone Mode*
>
> 1) Use spark-submit script to run:
>
> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>
> Interesting...I was getting an error like this: Initial job has not
> accepted
> any resources; check your cluster UI
>
> Now, when I run, it prints out the 3 Hello world statements in my code:
> KafkaJavaConsumer.txt
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
> >
>
> and then it seems to try to start the Kafka Stream, but fails:
>
> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream with
> group: c1
> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for stream 0
> from akka://sparkDriver
> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
> 10.0.1.232:2181
> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread
> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with
> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
> scala/reflect/ClassManifest
> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop
> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering receiver 0
> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
> scala/reflect/ClassManifest
>         at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>         at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>         at
> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>         at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>         at
>
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>         at
>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>         at
>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>         at
>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>         at
>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>         ... 18 more
>
> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>
> I ran into a couple other Class not found errors, and was able to solve
> them
> by adding dependencies on the pom file, but have not found such a solution
> to this error.
>
> On the Kafka side of things, I am simply typing in messages as soon as I
> start the Java app on another console. Is this okay?
>
> I have not set up an advertised host on the kafka side as I was able to
> still receive messages from other consoles by setting up a consumer to
> listen to the private ip:port. Is this okay?
>
> Lastly, is there command, like --from-beginning for a consumer in the java
> application to get messages from the beginning?
>
> Thanks a lot for the help and happy holidays!
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>