You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by m....@accenture.com on 2014/12/01 11:12:40 UTC

Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
       public static void main(String args[])
       {
              if(args.length != 3)
              {
                     System.out.println("Usage: spark-submit -class com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> <group_name> <topic1,topic2,...>");
                     System.exit(1);
              }


              Map<String,Integer> topicMap = new HashMap<String,Integer>();
              String[] topic = args[2].split(",");
              for(String t: topic)
              {
                     topicMap.put(t, new Integer(1));
              }

              JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
              JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

              System.out.println("Connection done");
              JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
                                                {
                                                       public String call(Tuple2<String, String> message)
                                                       {
                                                              System.out.println("NewMessage: "+message._2()); //for debugging
                                                              return message._2();
                                                       }
                                                });

data.print();

              jssc.start();
              jssc.awaitTermination();

       }
}


I am running the job, and at other terminal I am running kafka-producer to publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>    Hi kafka
>    second message
>    another message

But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received:


    -------------------------------------------
    Time: 1417107363000 ms
    -------------------------------------------

    14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s)
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms
    14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
    14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
    14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms
    14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
    14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer....  and bin/kafka-console-consumer...  its working perfect, but why not the code above? Please help me.


Regards,
Aiman Sarosh


________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by m....@accenture.com.
Hi Akhil,

The o/p is just perfect now ☺

-------------------------------------------
Time: 1417443060000 ms
-------------------------------------------

NewMessage: gd5++++++++++++++++++
-------------------------------------------
Time: 1417443061000 ms
-------------------------------------------
gd5

-------------------------------------------
Time: 1417443062000 ms
-------------------------------------------

NewMessage: bbgfbf++++++++++++++++++
NewMessage: bgbfdbg++++++++++++++++++
-------------------------------------------
Time: 1417443063000 ms
-------------------------------------------
bbgfbf
bgbfdbg

NewMessage: bfdgdfgd++++++++++++++++++
-------------------------------------------
Time: 1417443064000 ms
-------------------------------------------
bfdgdfgd

NewMessage: fgdfgdfg++++++++++++++++++
-------------------------------------------
Time: 1417443065000 ms
-------------------------------------------
fgdfgdfg

NewMessage: fgfdgdfgdf++++++++++++++++++
-------------------------------------------
Time: 1417443066000 ms
-------------------------------------------
fgfdgdfgdf

NewMessage: gfdgdfgfd++++++++++++++++++
-------------------------------------------
Time: 1417443067000 ms
-------------------------------------------
gfdgdfgfd

NewMessage: gfdgdfg++++++++++++++++++
-------------------------------------------
Time: 1417443068000 ms
-------------------------------------------
gfdgdfg

NewMessage: gdfgdfg++++++++++++++++++
-------------------------------------------
Time: 1417443069000 ms
-------------------------------------------
gdfgdfg

NewMessage: gfgfg++++++++++++++++++
NewMessage: fgfgdfg++++++++++++++++++
-------------------------------------------
Time: 1417443070000 ms
-------------------------------------------
gfgfg
fgfgdfg

NewMessage: gfdgdfg++++++++++++++++++
-------------------------------------------
Time: 1417443071000 ms
-------------------------------------------
gfdgdfg

NewMessage: gfdg++++++++++++++++++
NewMessage: 5345435435++++++++++++++++++
-------------------------------------------
Time: 1417443072000 ms
-------------------------------------------
gfdg
5345435435

NewMessage: 5345345345++++++++++++++++++
-------------------------------------------
Time: 1417443073000 ms
-------------------------------------------
5345345345

NewMessage: 534543534++++++++++++++++++
-------------------------------------------
Time: 1417443074000 ms
-------------------------------------------
534543534

NewMessage: 54354534++++++++++++++++++
-------------------------------------------
Time: 1417443075000 ms
-------------------------------------------
54354534

NewMessage: 543534++++++++++++++++++
-------------------------------------------
Time: 1417443076000 ms
-------------------------------------------
543534

NewMessage: fgbfg++++++++++++++++++
NewMessage: fvfvf++++++++++++++++++
-------------------------------------------
Time: 1417443077000 ms
-------------------------------------------
fgbfg
fvfvf

NewMessage: vcvcv++++++++++++++++++
-------------------------------------------
Time: 1417443078000 ms
-------------------------------------------
vcvcv

NewMessage: gfgdfg++++++++++++++++++
-------------------------------------------
Time: 1417443079000 ms
-------------------------------------------
gfgdfg

NewMessage: gfgfd++++++++++++++++++
-------------------------------------------
Time: 1417443080000 ms
-------------------------------------------
gfgfd


Thanks a ton ☺

Regards,
Aiman


From: Akhil Das [mailto:akhil@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 6:33 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Looks good.

Add these lines in the code if you want to get ride of those log4j info/warn messages

import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

Thanks
Best Regards

On Mon, Dec 1, 2014 at 6:22 PM, <m....@accenture.com>> wrote:
Hi,

I have now increased the core to 10.
[cid:image001.png@01D00D96.B6E327A0]

This time the messages are coming but sometimes warning. Like, 15-17 lines I entered:
Hiii
Hiiiiii
Hiiiiiiiii
Jpjpjpjpj
…
….

But I got only 4 lines in return, and with some warnings also
The logs are :

2014-12-01 08:43:32,154 INFO  [Executor task launch worker-2] executor.Executor (Logging.scala:logInfo(59)) - Running task 0.0 in stage 10.0 (TID 18)
2014-12-01 08:43:32,163 INFO  [Executor task launch worker-2] storage.BlockManager (Logging.scala:logInfo(59)) - Found block input-0-1417441408800 locally
NewMessage: hiiii++++++++++++++++++
2014-12-01 08:43:32,164 INFO  [Executor task launch worker-2] executor.Executor (Logging.scala:logInfo(59)) - Finished task 0.0 in stage 10.0 (TID 18). 593 bytes result sent to driver
2014-12-01 08:43:32,167 INFO  [Result resolver thread-1] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 0.0 in stage 10.0 (TID 18) in 42 ms on localhost (1/3)
2014-12-01 08:43:32,166 INFO  [Executor task launch worker-3] executor.Executor (Logging.scala:logInfo(59)) - Running task 2.0 in stage 10.0 (TID 20)
2014-12-01 08:43:32,166 INFO  [Executor task launch worker-1] executor.Executor (Logging.scala:logInfo(59)) - Running task 1.0 in stage 10.0 (TID 19)
2014-12-01 08:43:32,181 INFO  [Executor task launch worker-1] storage.BlockManager (Logging.scala:logInfo(59)) - Found block input-0-1417441409800 locally
NewMessage: hiiii++++++++++++++++++
2014-12-01 08:43:32,183 INFO  [Executor task launch worker-3] storage.BlockManager (Logging.scala:logInfo(59)) - Found block input-0-1417441410800 locally
NewMessage: hiii++++++++++++++++++
2014-12-01 08:43:32,188 INFO  [Executor task launch worker-1] executor.Executor (Logging.scala:logInfo(59)) - Finished task 1.0 in stage 10.0 (TID 19). 593 bytes result sent to driver
2014-12-01 08:43:32,188 INFO  [Result resolver thread-0] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 1.0 in stage 10.0 (TID 19) in 46 ms on localhost (2/3)
2014-12-01 08:43:32,196 INFO  [Result resolver thread-2] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 2.0 in stage 10.0 (TID 20) in 51 ms on localhost (3/3)
2014-12-01 08:43:32,197 INFO  [Result resolver thread-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 10.0, whose tasks have all completed, from pool
2014-12-01 08:43:32,197 INFO  [sparkDriver-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 10 (take at DStream.scala:608) finished in 0.058 s
2014-12-01 08:43:32,198 INFO  [pool-7-thread-1] spark.SparkContext (Logging.scala:logInfo(59)) - Job finished: take at DStream.scala:608, took 0.103179818 s
-------------------------------------------
Time: 1417441412000 ms
-------------------------------------------
hiiii
hiiii
hiiii
hiii

2014-12-01 08:43:32,199 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417441412000 ms.0 from job set of time 1417441412000 ms
2014-12-01 08:43:32,199 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.199 s for time 1417441412000 ms (execution: 0.169 s)
2014-12-01 08:43:32,199 INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 16 from persistence list
2014-12-01 08:43:32,201 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 16
2014-12-01 08:43:32,202 INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 15 from persistence list
2014-12-01 08:43:32,203 INFO  [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 15
2014-12-01 08:43:32,204 INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[15] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417441412000 ms
2014-12-01 08:43:32,205 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441403800
2014-12-01 08:43:32,205 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441403800 of size 78 dropped from memory (free 280228496)
2014-12-01 08:43:32,206 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441403800 on 192.168.88.130:34367<http://192.168.88.130:34367> in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,206 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441403800
2014-12-01 08:43:32,207 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441404800
2014-12-01 08:43:32,207 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441404800 of size 78 dropped from memory (free 280228574)
2014-12-01 08:43:32,208 INFO  [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441404800 on 192.168.88.130:34367<http://192.168.88.130:34367> in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,209 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441404800
2014-12-01 08:43:32,212 INFO  [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441405800
2014-12-01 08:43:32,212 INFO  [sparkDriver-akka.actor.default-dispatcher-16] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441405800 of size 78 dropped from memory (free 280228652)
2014-12-01 08:43:32,213 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441405800 on 192.168.88.130:34367<http://192.168.88.130:34367> in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,213 INFO  [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441405800
2014-12-01 08:43:32,213 INFO  [Executor task launch worker-3] executor.Executor (Logging.scala:logInfo(59)) - Finished task 2.0 in stage 10.0 (TID 20). 592 bytes result sent to driver
2014-12-01 08:43:32,214 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441406800
2014-12-01 08:43:32,214 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441406800 of size 78 dropped from memory (free 280228730)
2014-12-01 08:43:32,215 INFO  [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441406800 on 192.168.88.130:34367<http://192.168.88.130:34367> in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,215 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441406800
2014-12-01 08:43:33,002 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20245, maxMem=280248975
2014-12-01 08:43:33,003 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441412800 stored as bytes in memory (estimated size 77.0 B, free 267.2 MB)
2014-12-01 08:43:33,005 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added input-0-1417441412800 in memory on 192.168.88.130:34367<http://192.168.88.130:34367> (size: 77.0 B, free: 267.3 MB)
2014-12-01 08:43:33,016 INFO  [Thread-31] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441412800
2014-12-01 08:43:33,023 WARN  [handle-message-executor-2] storage.BlockManager (Logging.scala:logWarning(71)) - Block input-0-1417441412800 already exists on this machine; not re-adding it
2014-12-01 08:43:33,029 INFO  [Thread-31] receiver.BlockGenerator (Logging.scala:logInfo(59)) - Pushed block input-0-1417441412800
2014-12-01 08:43:34,001 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20322, maxMem=280248975
2014-12-01 08:43:34,002 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441413800 stored as bytes in memory (estimated size 77.0 B, free 267.2 MB)
2014-12-01 08:43:34,003 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added input-0-1417441413800 in memory on 192.168.88.130:34367<http://192.168.88.130:34367> (size: 77.0 B, free: 267.3 MB)
2014-12-01 08:43:34,007 INFO  [Thread-31] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441413800
2014-12-01 08:43:34,014 WARN  [handle-message-executor-4] storage.BlockManager (Logging.scala:logWarning(71)) - Block input-0-1417441413800 already exists on this machine; not re-adding it


Is there something more missing in configuration ?

Regargds,
Aiman

From: Akhil Das [mailto:akhil@sigmoidanalytics.com<ma...@sigmoidanalytics.com>]
Sent: Monday, December 01, 2014 6:09 PM

To: Sarosh, M.
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

You would need a multi-core machine (>= 2 cores) for spark-streaming to work while running in standalone mode. But it will work fine if you run it in local mode with master as local[4].

What are you getting after making this change

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));

and running the code? Are you still getting :

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

(Make sure you push some data to kafka producer while the code is running)


Thanks
Best Regards

On Mon, Dec 1, 2014 at 5:38 PM, <m....@accenture.com>> wrote:
Hi,
I have now configured the Spark…I had CDH5 installation, so referred the installation doc.
I have the worker up now:
[cid:image002.jpg@01D00D96.B6E327A0]
Now I tried using:
JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077/>", "SparkStream", new Duration(3000));

Also,
JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));

And,
JavaStreamingContext jssc = new JavaStreamingContext("local[*]", "SparkStream", new Duration(3000));


Log are still the same:
-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------

2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO  [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks



From: Akhil Das [mailto:akhil@sigmoidanalytics.com<ma...@sigmoidanalytics.com>]
Sent: Monday, December 01, 2014 4:41 PM

To: Sarosh, M.
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

I see you have no worker machines to execute the job

[Inline image 1]

You haven't configured your spark cluster properly.

Quick fix to get it running would be run it on local mode, for that change this line

JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077/>", "SparkStream", new Duration(3000));

to this

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));


Thanks
Best Regards

On Mon, Dec 1, 2014 at 4:18 PM, <m....@accenture.com>> wrote:
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image004.png@01D00D96.B6E327A0]

The warning is gone, and the new log is:
-------------------------------------------
Time: 1417427850000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
-------------------------------------------
Time: 1417427853000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:akhil@sigmoidanalytics.com<ma...@sigmoidanalytics.com>]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com>> wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
       public static void main(String args[])
       {
              if(args.length != 3)
              {
                     System.out.println("Usage: spark-submit –class com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> <group_name> <topic1,topic2,...>");
                     System.exit(1);
              }


              Map<String,Integer> topicMap = new HashMap<String,Integer>();
              String[] topic = args[2].split(",");
              for(String t: topic)
              {
                     topicMap.put(t, new Integer(1));
              }

              JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077>", "SparkStream", new Duration(3000));
              JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

              System.out.println("Connection done");
              JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
                                                {
                                                       public String call(Tuple2<String, String> message)
                                                       {
                                                              System.out.println("NewMessage: "+message._2()); //for debugging
                                                              return message._2();
                                                       }
                                                });

data.print();

              jssc.start();
              jssc.awaitTermination();

       }
}


I am running the job, and at other terminal I am running kafka-producer to publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>    Hi kafka
>    second message
>    another message

But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received:


    -------------------------------------------
    Time: 1417107363000 ms
    -------------------------------------------

    14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s)
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms
    14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
    14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
    14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms
    14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
    14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer....  and bin/kafka-console-consumer...  its working perfect, but why not the code above? Please help me.


Regards,
Aiman Sarosh


________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>





Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Looks good.

Add these lines in the code if you want to get ride of those log4j
info/warn messages

import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)


Thanks
Best Regards

On Mon, Dec 1, 2014 at 6:22 PM, <m....@accenture.com> wrote:

>  Hi,
>
>
>
> I have now increased the core to 10.
>
>
>
> This time the messages are coming but sometimes warning. Like, 15-17 lines
> I entered:
>
> Hiii
>
> Hiiiiii
>
> Hiiiiiiiii
>
> Jpjpjpjpj
>
> …
>
> ….
>
>
>
> But I got only 4 lines in return, and with some warnings also
>
> The logs are :
>
>
>
> 2014-12-01 08:43:32,154 INFO  [Executor task launch worker-2]
> executor.Executor (Logging.scala:logInfo(59)) - Running task 0.0 in stage
> 10.0 (TID 18)
>
> 2014-12-01 08:43:32,163 INFO  [Executor task launch worker-2]
> storage.BlockManager (Logging.scala:logInfo(59)) - Found block
> input-0-1417441408800 locally
>
> NewMessage: hiiii++++++++++++++++++
>
> 2014-12-01 08:43:32,164 INFO  [Executor task launch worker-2]
> executor.Executor (Logging.scala:logInfo(59)) - Finished task 0.0 in stage
> 10.0 (TID 18). 593 bytes result sent to driver
>
> 2014-12-01 08:43:32,167 INFO  [Result resolver thread-1]
> scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 0.0 in
> stage 10.0 (TID 18) in 42 ms on localhost (1/3)
>
> 2014-12-01 08:43:32,166 INFO  [Executor task launch worker-3]
> executor.Executor (Logging.scala:logInfo(59)) - Running task 2.0 in stage
> 10.0 (TID 20)
>
> 2014-12-01 08:43:32,166 INFO  [Executor task launch worker-1]
> executor.Executor (Logging.scala:logInfo(59)) - Running task 1.0 in stage
> 10.0 (TID 19)
>
> 2014-12-01 08:43:32,181 INFO  [Executor task launch worker-1]
> storage.BlockManager (Logging.scala:logInfo(59)) - Found block
> input-0-1417441409800 locally
>
> NewMessage: hiiii++++++++++++++++++
>
> 2014-12-01 08:43:32,183 INFO  [Executor task launch worker-3]
> storage.BlockManager (Logging.scala:logInfo(59)) - Found block
> input-0-1417441410800 locally
>
> NewMessage: hiii++++++++++++++++++
>
> 2014-12-01 08:43:32,188 INFO  [Executor task launch worker-1]
> executor.Executor (Logging.scala:logInfo(59)) - Finished task 1.0 in stage
> 10.0 (TID 19). 593 bytes result sent to driver
>
> 2014-12-01 08:43:32,188 INFO  [Result resolver thread-0]
> scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 1.0 in
> stage 10.0 (TID 19) in 46 ms on localhost (2/3)
>
> 2014-12-01 08:43:32,196 INFO  [Result resolver thread-2]
> scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 2.0 in
> stage 10.0 (TID 20) in 51 ms on localhost (3/3)
>
> 2014-12-01 08:43:32,197 INFO  [Result resolver thread-2]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
> 10.0, whose tasks have all completed, from pool
>
> 2014-12-01 08:43:32,197 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 10 (take at DStream.scala:608) finished
> in 0.058 s
>
> 2014-12-01 08:43:32,198 INFO  [pool-7-thread-1] spark.SparkContext
> (Logging.scala:logInfo(59)) - Job finished: take at DStream.scala:608, took
> 0.103179818 s
>
> -------------------------------------------
>
> Time: 1417441412000 ms
>
> -------------------------------------------
>
> hiiii
>
> hiiii
>
> hiiii
>
> hiii
>
>
>
> 2014-12-01 08:43:32,199 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417441412000 ms.0
> from job set of time 1417441412000 ms
>
> 2014-12-01 08:43:32,199 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.199 s for time 1417441412000
> ms (execution: 0.169 s)
>
> 2014-12-01 08:43:32,199 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 16 from persistence list
>
> 2014-12-01 08:43:32,201 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 16
>
> 2014-12-01 08:43:32,202 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 15 from persistence list
>
> 2014-12-01 08:43:32,203 INFO
> [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 15
>
> 2014-12-01 08:43:32,204 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream
> (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[15] at
> BlockRDD at ReceiverInputDStream.scala:69 of time 1417441412000 ms
>
> 2014-12-01 08:43:32,205 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441403800
>
> 2014-12-01 08:43:32,205 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441403800 of size 78
> dropped from memory (free 280228496)
>
> 2014-12-01 08:43:32,206 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441403800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,206 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441403800
>
> 2014-12-01 08:43:32,207 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441404800
>
> 2014-12-01 08:43:32,207 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441404800 of size 78
> dropped from memory (free 280228574)
>
> 2014-12-01 08:43:32,208 INFO
> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441404800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,209 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441404800
>
> 2014-12-01 08:43:32,212 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441405800
>
> 2014-12-01 08:43:32,212 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441405800 of size 78
> dropped from memory (free 280228652)
>
> 2014-12-01 08:43:32,213 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441405800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,213 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441405800
>
> 2014-12-01 08:43:32,213 INFO  [Executor task launch worker-3]
> executor.Executor (Logging.scala:logInfo(59)) - Finished task 2.0 in stage
> 10.0 (TID 20). 592 bytes result sent to driver
>
> 2014-12-01 08:43:32,214 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441406800
>
> 2014-12-01 08:43:32,214 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441406800 of size 78
> dropped from memory (free 280228730)
>
> 2014-12-01 08:43:32,215 INFO
> [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441406800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,215 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441406800
>
> 2014-12-01 08:43:33,002 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20245,
> maxMem=280248975
>
> 2014-12-01 08:43:33,003 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441412800 stored as bytes
> in memory (estimated size 77.0 B, free 267.2 MB)
>
> 2014-12-01 08:43:33,005 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Added input-0-1417441412800 in memory on
> 192.168.88.130:34367 (size: 77.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:33,016 INFO  [Thread-31] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441412800
>
> 2014-12-01 08:43:33,023 WARN  [handle-message-executor-2]
> storage.BlockManager (Logging.scala:logWarning(71)) - Block
> input-0-1417441412800 already exists on this machine; not re-adding it
>
> 2014-12-01 08:43:33,029 INFO  [Thread-31] receiver.BlockGenerator
> (Logging.scala:logInfo(59)) - Pushed block input-0-1417441412800
>
> 2014-12-01 08:43:34,001 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20322,
> maxMem=280248975
>
> 2014-12-01 08:43:34,002 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441413800 stored as bytes
> in memory (estimated size 77.0 B, free 267.2 MB)
>
> 2014-12-01 08:43:34,003 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Added input-0-1417441413800 in memory on
> 192.168.88.130:34367 (size: 77.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:34,007 INFO  [Thread-31] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441413800
>
> 2014-12-01 08:43:34,014 WARN  [handle-message-executor-4]
> storage.BlockManager (Logging.scala:logWarning(71)) - Block
> input-0-1417441413800 already exists on this machine; not re-adding it
>
>
>
>
>
> Is there something more missing in configuration ?
>
>
>
> Regargds,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 6:09 PM
>
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> You would need a multi-core machine (>= 2 cores) for spark-streaming to
> work while running in standalone mode. But it will work fine if you run it
> in local mode with master as local[4].
>
>
>
> What are you getting after making this change
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
> and running the code? Are you still getting :
>
>
>
> Initial job has not accepted any resources; check your cluster UI to
> ensure that workers are registered and have sufficient memory
>
>
>
> (Make sure you push some data to kafka producer while the code is running)
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 5:38 PM, <m....@accenture.com> wrote:
>
> Hi,
>
> I have now configured the Spark…I had CDH5 installation, so referred the
> installation doc.
>
> I have the worker up now:
>
>  Now I tried using:
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> Also,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
> And,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[*]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>
> Log are still the same:
>
> -------------------------------------------
>
> Time: 1417438988000 ms
>
> -------------------------------------------
>
>
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,009 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000
> ms (execution: 0.000 s)
>
> 2014-12-01 08:03:08,010 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
>
> 2014-12-01 08:03:08,015 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
>
> 2014-12-01 08:03:08,024 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 39
>
> 2014-12-01 08:03:08,027 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
>
> 2014-12-01 08:03:08,031 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 38
>
> 2014-12-01 08:03:08,033 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream
> (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at
> BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
>
> 2014-12-01 08:03:09,002 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker
> (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
>
>
>
>
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 4:41 PM
>
>
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> I see you have no worker machines to execute the job
>
>
>
> [image: Inline image 1]
>
>
>
> You haven't configured your spark cluster properly.
>
>
>
> Quick fix to get it running would be run it on local mode, for that change
> this line
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> to this
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 4:18 PM, <m....@accenture.com> wrote:
>
> Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> -------------------------------------------
>
> Time: 1417427850000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> -------------------------------------------
>
> Time: 1417427853000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
>
>
> A quick guess would be, you are giving the wrong master url. ( spark://
> 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
> master url listed there on top left corner of the page.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com> wrote:
>
> Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
>
>  ------------------------------
>
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>
>
>
>
>
>
>

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by m....@accenture.com.
Hi,

I have now increased the core to 10.
[cid:image001.png@01D00D93.ACD10F00]

This time the messages are coming but sometimes warning. Like, 15-17 lines I entered:
Hiii
Hiiiiii
Hiiiiiiiii
Jpjpjpjpj
…
….

But I got only 4 lines in return, and with some warnings also
The logs are :

2014-12-01 08:43:32,154 INFO  [Executor task launch worker-2] executor.Executor (Logging.scala:logInfo(59)) - Running task 0.0 in stage 10.0 (TID 18)
2014-12-01 08:43:32,163 INFO  [Executor task launch worker-2] storage.BlockManager (Logging.scala:logInfo(59)) - Found block input-0-1417441408800 locally
NewMessage: hiiii++++++++++++++++++
2014-12-01 08:43:32,164 INFO  [Executor task launch worker-2] executor.Executor (Logging.scala:logInfo(59)) - Finished task 0.0 in stage 10.0 (TID 18). 593 bytes result sent to driver
2014-12-01 08:43:32,167 INFO  [Result resolver thread-1] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 0.0 in stage 10.0 (TID 18) in 42 ms on localhost (1/3)
2014-12-01 08:43:32,166 INFO  [Executor task launch worker-3] executor.Executor (Logging.scala:logInfo(59)) - Running task 2.0 in stage 10.0 (TID 20)
2014-12-01 08:43:32,166 INFO  [Executor task launch worker-1] executor.Executor (Logging.scala:logInfo(59)) - Running task 1.0 in stage 10.0 (TID 19)
2014-12-01 08:43:32,181 INFO  [Executor task launch worker-1] storage.BlockManager (Logging.scala:logInfo(59)) - Found block input-0-1417441409800 locally
NewMessage: hiiii++++++++++++++++++
2014-12-01 08:43:32,183 INFO  [Executor task launch worker-3] storage.BlockManager (Logging.scala:logInfo(59)) - Found block input-0-1417441410800 locally
NewMessage: hiii++++++++++++++++++
2014-12-01 08:43:32,188 INFO  [Executor task launch worker-1] executor.Executor (Logging.scala:logInfo(59)) - Finished task 1.0 in stage 10.0 (TID 19). 593 bytes result sent to driver
2014-12-01 08:43:32,188 INFO  [Result resolver thread-0] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 1.0 in stage 10.0 (TID 19) in 46 ms on localhost (2/3)
2014-12-01 08:43:32,196 INFO  [Result resolver thread-2] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 2.0 in stage 10.0 (TID 20) in 51 ms on localhost (3/3)
2014-12-01 08:43:32,197 INFO  [Result resolver thread-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 10.0, whose tasks have all completed, from pool
2014-12-01 08:43:32,197 INFO  [sparkDriver-akka.actor.default-dispatcher-16] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stage 10 (take at DStream.scala:608) finished in 0.058 s
2014-12-01 08:43:32,198 INFO  [pool-7-thread-1] spark.SparkContext (Logging.scala:logInfo(59)) - Job finished: take at DStream.scala:608, took 0.103179818 s
-------------------------------------------
Time: 1417441412000 ms
-------------------------------------------
hiiii
hiiii
hiiii
hiii

2014-12-01 08:43:32,199 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417441412000 ms.0 from job set of time 1417441412000 ms
2014-12-01 08:43:32,199 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.199 s for time 1417441412000 ms (execution: 0.169 s)
2014-12-01 08:43:32,199 INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 16 from persistence list
2014-12-01 08:43:32,201 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 16
2014-12-01 08:43:32,202 INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 15 from persistence list
2014-12-01 08:43:32,203 INFO  [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 15
2014-12-01 08:43:32,204 INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[15] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417441412000 ms
2014-12-01 08:43:32,205 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441403800
2014-12-01 08:43:32,205 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441403800 of size 78 dropped from memory (free 280228496)
2014-12-01 08:43:32,206 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441403800 on 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,206 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441403800
2014-12-01 08:43:32,207 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441404800
2014-12-01 08:43:32,207 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441404800 of size 78 dropped from memory (free 280228574)
2014-12-01 08:43:32,208 INFO  [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441404800 on 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,209 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441404800
2014-12-01 08:43:32,212 INFO  [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441405800
2014-12-01 08:43:32,212 INFO  [sparkDriver-akka.actor.default-dispatcher-16] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441405800 of size 78 dropped from memory (free 280228652)
2014-12-01 08:43:32,213 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441405800 on 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,213 INFO  [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441405800
2014-12-01 08:43:32,213 INFO  [Executor task launch worker-3] executor.Executor (Logging.scala:logInfo(59)) - Finished task 2.0 in stage 10.0 (TID 20). 592 bytes result sent to driver
2014-12-01 08:43:32,214 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager (Logging.scala:logInfo(59)) - Removing block input-0-1417441406800
2014-12-01 08:43:32,214 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441406800 of size 78 dropped from memory (free 280228730)
2014-12-01 08:43:32,215 INFO  [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed input-0-1417441406800 on 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
2014-12-01 08:43:32,215 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441406800
2014-12-01 08:43:33,002 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20245, maxMem=280248975
2014-12-01 08:43:33,003 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441412800 stored as bytes in memory (estimated size 77.0 B, free 267.2 MB)
2014-12-01 08:43:33,005 INFO  [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added input-0-1417441412800 in memory on 192.168.88.130:34367 (size: 77.0 B, free: 267.3 MB)
2014-12-01 08:43:33,016 INFO  [Thread-31] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441412800
2014-12-01 08:43:33,023 WARN  [handle-message-executor-2] storage.BlockManager (Logging.scala:logWarning(71)) - Block input-0-1417441412800 already exists on this machine; not re-adding it
2014-12-01 08:43:33,029 INFO  [Thread-31] receiver.BlockGenerator (Logging.scala:logInfo(59)) - Pushed block input-0-1417441412800
2014-12-01 08:43:34,001 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20322, maxMem=280248975
2014-12-01 08:43:34,002 INFO  [Thread-31] storage.MemoryStore (Logging.scala:logInfo(59)) - Block input-0-1417441413800 stored as bytes in memory (estimated size 77.0 B, free 267.2 MB)
2014-12-01 08:43:34,003 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added input-0-1417441413800 in memory on 192.168.88.130:34367 (size: 77.0 B, free: 267.3 MB)
2014-12-01 08:43:34,007 INFO  [Thread-31] storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441413800
2014-12-01 08:43:34,014 WARN  [handle-message-executor-4] storage.BlockManager (Logging.scala:logWarning(71)) - Block input-0-1417441413800 already exists on this machine; not re-adding it


Is there something more missing in configuration ?

Regargds,
Aiman

From: Akhil Das [mailto:akhil@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 6:09 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

You would need a multi-core machine (>= 2 cores) for spark-streaming to work while running in standalone mode. But it will work fine if you run it in local mode with master as local[4].

What are you getting after making this change

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));

and running the code? Are you still getting :

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

(Make sure you push some data to kafka producer while the code is running)


Thanks
Best Regards

On Mon, Dec 1, 2014 at 5:38 PM, <m....@accenture.com>> wrote:
Hi,
I have now configured the Spark…I had CDH5 installation, so referred the installation doc.
I have the worker up now:
[cid:image002.jpg@01D00D93.ACD10F00]
Now I tried using:
JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077/>", "SparkStream", new Duration(3000));

Also,
JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));

And,
JavaStreamingContext jssc = new JavaStreamingContext("local[*]", "SparkStream", new Duration(3000));


Log are still the same:
-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------

2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO  [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks



From: Akhil Das [mailto:akhil@sigmoidanalytics.com<ma...@sigmoidanalytics.com>]
Sent: Monday, December 01, 2014 4:41 PM

To: Sarosh, M.
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

I see you have no worker machines to execute the job

[Inline image 1]

You haven't configured your spark cluster properly.

Quick fix to get it running would be run it on local mode, for that change this line

JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077/>", "SparkStream", new Duration(3000));

to this

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));


Thanks
Best Regards

On Mon, Dec 1, 2014 at 4:18 PM, <m....@accenture.com>> wrote:
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image004.png@01D00D93.ACD10F00]

The warning is gone, and the new log is:
-------------------------------------------
Time: 1417427850000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
-------------------------------------------
Time: 1417427853000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:akhil@sigmoidanalytics.com<ma...@sigmoidanalytics.com>]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com>> wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
       public static void main(String args[])
       {
              if(args.length != 3)
              {
                     System.out.println("Usage: spark-submit –class com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> <group_name> <topic1,topic2,...>");
                     System.exit(1);
              }


              Map<String,Integer> topicMap = new HashMap<String,Integer>();
              String[] topic = args[2].split(",");
              for(String t: topic)
              {
                     topicMap.put(t, new Integer(1));
              }

              JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077>", "SparkStream", new Duration(3000));
              JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

              System.out.println("Connection done");
              JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
                                                {
                                                       public String call(Tuple2<String, String> message)
                                                       {
                                                              System.out.println("NewMessage: "+message._2()); //for debugging
                                                              return message._2();
                                                       }
                                                });

data.print();

              jssc.start();
              jssc.awaitTermination();

       }
}


I am running the job, and at other terminal I am running kafka-producer to publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>    Hi kafka
>    second message
>    another message

But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received:


    -------------------------------------------
    Time: 1417107363000 ms
    -------------------------------------------

    14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s)
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms
    14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
    14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
    14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms
    14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
    14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer....  and bin/kafka-console-consumer...  its working perfect, but why not the code above? Please help me.


Regards,
Aiman Sarosh


________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>




Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
You would need a multi-core machine (>= 2 cores) for spark-streaming to
work while running in standalone mode. But it will work fine if you run it
in local mode with master as local[4].

What are you getting after making this change

JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
"SparkStream", *new* Duration(3000));

and running the code? Are you still getting :

Initial job has not accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient memory

(Make sure you push some data to kafka producer while the code is running)


Thanks
Best Regards

On Mon, Dec 1, 2014 at 5:38 PM, <m....@accenture.com> wrote:

>  Hi,
>
> I have now configured the Spark…I had CDH5 installation, so referred the
> installation doc.
>
> I have the worker up now:
>
>  Now I tried using:
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> Also,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
> And,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[*]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>
> Log are still the same:
>
> -------------------------------------------
>
> Time: 1417438988000 ms
>
> -------------------------------------------
>
>
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,009 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000
> ms (execution: 0.000 s)
>
> 2014-12-01 08:03:08,010 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
>
> 2014-12-01 08:03:08,015 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
>
> 2014-12-01 08:03:08,024 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 39
>
> 2014-12-01 08:03:08,027 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
>
> 2014-12-01 08:03:08,031 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 38
>
> 2014-12-01 08:03:08,033 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream
> (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at
> BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
>
> 2014-12-01 08:03:09,002 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker
> (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
>
>
>
>
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 4:41 PM
>
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> I see you have no worker machines to execute the job
>
>
>
> [image: Inline image 1]
>
>
>
> You haven't configured your spark cluster properly.
>
>
>
> Quick fix to get it running would be run it on local mode, for that change
> this line
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> to this
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 4:18 PM, <m....@accenture.com> wrote:
>
> Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> -------------------------------------------
>
> Time: 1417427850000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> -------------------------------------------
>
> Time: 1417427853000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
>
>
> A quick guess would be, you are giving the wrong master url. ( spark://
> 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
> master url listed there on top left corner of the page.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com> wrote:
>
> Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
>
>  ------------------------------
>
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>
>
>
>
>

Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by Gerard Maas <ge...@gmail.com>.
You have only 1 core available - Spark Streaming will be able to consume
messages but not process them as there're no additional resources to
process the RDD.
You need to further tune your configuration to add more cores at the
executor. Have a look at the configuration options in the docs:
http://spark.apache.org/docs/latest/spark-standalone.html

-kr, Gerard.

On Mon, Dec 1, 2014 at 1:08 PM, <m....@accenture.com> wrote:

>  Hi,
>
> I have now configured the Spark…I had CDH5 installation, so referred the
> installation doc.
>
> I have the worker up now:
>
>  Now I tried using:
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> Also,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
> And,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[*]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>
> Log are still the same:
>
> -------------------------------------------
>
> Time: 1417438988000 ms
>
> -------------------------------------------
>
>
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,009 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000
> ms (execution: 0.000 s)
>
> 2014-12-01 08:03:08,010 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
>
> 2014-12-01 08:03:08,015 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
>
> 2014-12-01 08:03:08,024 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 39
>
> 2014-12-01 08:03:08,027 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
>
> 2014-12-01 08:03:08,031 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 38
>
> 2014-12-01 08:03:08,033 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream
> (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at
> BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
>
> 2014-12-01 08:03:09,002 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker
> (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
>
>
>
>
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 4:41 PM
>
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> I see you have no worker machines to execute the job
>
>
>
> [image: Inline image 1]
>
>
>
> You haven't configured your spark cluster properly.
>
>
>
> Quick fix to get it running would be run it on local mode, for that change
> this line
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> to this
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 4:18 PM, <m....@accenture.com> wrote:
>
> Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> -------------------------------------------
>
> Time: 1417427850000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> -------------------------------------------
>
> Time: 1417427853000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
>
>
> A quick guess would be, you are giving the wrong master url. ( spark://
> 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
> master url listed there on top left corner of the page.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com> wrote:
>
> Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
>
>  ------------------------------
>
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>
>
>
>
>

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by m....@accenture.com.
Hi,
I have now configured the Spark…I had CDH5 installation, so referred the installation doc.
I have the worker up now:
[cid:image003.jpg@01D00D8D.9E115430]
Now I tried using:
JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077/>", "SparkStream", new Duration(3000));

Also,
JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));

And,
JavaStreamingContext jssc = new JavaStreamingContext("local[*]", "SparkStream", new Duration(3000));


Log are still the same:
-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------

2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO  [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks



From: Akhil Das [mailto:akhil@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 4:41 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

I see you have no worker machines to execute the job

[Inline image 1]

You haven't configured your spark cluster properly.

Quick fix to get it running would be run it on local mode, for that change this line

JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077/>", "SparkStream", new Duration(3000));

to this

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(3000));


Thanks
Best Regards

On Mon, Dec 1, 2014 at 4:18 PM, <m....@accenture.com>> wrote:
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image005.png@01D00D8C.E0D71940]

The warning is gone, and the new log is:
-------------------------------------------
Time: 1417427850000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
-------------------------------------------
Time: 1417427853000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:akhil@sigmoidanalytics.com<ma...@sigmoidanalytics.com>]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com>> wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
       public static void main(String args[])
       {
              if(args.length != 3)
              {
                     System.out.println("Usage: spark-submit –class com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> <group_name> <topic1,topic2,...>");
                     System.exit(1);
              }


              Map<String,Integer> topicMap = new HashMap<String,Integer>();
              String[] topic = args[2].split(",");
              for(String t: topic)
              {
                     topicMap.put(t, new Integer(1));
              }

              JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077>", "SparkStream", new Duration(3000));
              JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

              System.out.println("Connection done");
              JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
                                                {
                                                       public String call(Tuple2<String, String> message)
                                                       {
                                                              System.out.println("NewMessage: "+message._2()); //for debugging
                                                              return message._2();
                                                       }
                                                });

data.print();

              jssc.start();
              jssc.awaitTermination();

       }
}


I am running the job, and at other terminal I am running kafka-producer to publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>    Hi kafka
>    second message
>    another message

But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received:


    -------------------------------------------
    Time: 1417107363000 ms
    -------------------------------------------

    14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s)
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms
    14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
    14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
    14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms
    14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
    14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer....  and bin/kafka-console-consumer...  its working perfect, but why not the code above? Please help me.


Regards,
Aiman Sarosh


________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>



Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
I see you have no worker machines to execute the job

[image: Inline image 1]

You haven't configured your spark cluster properly.

Quick fix to get it running would be run it on local mode, for that change
this line

JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
192.168.88.130:7077", "SparkStream", *new* Duration(3000));

to this

JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
"SparkStream", *new* Duration(3000));


Thanks
Best Regards

On Mon, Dec 1, 2014 at 4:18 PM, <m....@accenture.com> wrote:

>  Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> -------------------------------------------
>
> Time: 1417427850000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> -------------------------------------------
>
> Time: 1417427853000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
>
>
> A quick guess would be, you are giving the wrong master url. ( spark://
> 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
> master url listed there on top left corner of the page.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com> wrote:
>
> Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
>
>  ------------------------------
>
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>
>
>

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by m....@accenture.com.
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image001.png@01D00D82.6DC2FFF0]

The warning is gone, and the new log is:
-------------------------------------------
Time: 1417427850000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0 from job set of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427850000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
-------------------------------------------
Time: 1417427853000 ms
-------------------------------------------

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:akhil@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077<http://192.168.88.130:7077/> ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com>> wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
       public static void main(String args[])
       {
              if(args.length != 3)
              {
                     System.out.println("Usage: spark-submit –class com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip> <group_name> <topic1,topic2,...>");
                     System.exit(1);
              }


              Map<String,Integer> topicMap = new HashMap<String,Integer>();
              String[] topic = args[2].split(",");
              for(String t: topic)
              {
                     topicMap.put(t, new Integer(1));
              }

              JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077<http://192.168.88.130:7077>", "SparkStream", new Duration(3000));
              JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

              System.out.println("Connection done");
              JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
                                                {
                                                       public String call(Tuple2<String, String> message)
                                                       {
                                                              System.out.println("NewMessage: "+message._2()); //for debugging
                                                              return message._2();
                                                       }
                                                });

data.print();

              jssc.start();
              jssc.awaitTermination();

       }
}


I am running the job, and at other terminal I am running kafka-producer to publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>    Hi kafka
>    second message
>    another message

But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received:


    -------------------------------------------
    Time: 1417107363000 ms
    -------------------------------------------

    14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s)
    14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms
    14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
    14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
    14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms
    14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
    14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer....  and bin/kafka-console-consumer...  its working perfect, but why not the code above? Please help me.


Regards,
Aiman Sarosh


________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>


Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://
192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, <m....@accenture.com> wrote:

>  Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
> ------------------------------
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>