You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohit Anchlia <mo...@gmail.com> on 2015/03/26 22:38:34 UTC

WordCount example

I am trying to run the word count example but for some reason it's not
working as expected. I start "nc" server on port 9999 and then submit the
spark job to the cluster. Spark job gets successfully submitting but I
never see any connection from spark getting established. I also tried to
type words on the console where "nc" is listening and waiting on the
prompt, however I don't see any output. I also don't see any errors.

Here is the conf:

SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
"NetworkWordCount");

JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
*seconds*(1));

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost",
9999);

Re: WordCount example

Posted by Tathagata Das <td...@databricks.com>.
There are no workers registered with the Spark Standalone master! That is
the crux of the problem. :)
Follow the instructions properly -
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
Especially make the conf/slaves file has intended workers listed.

TD

On Mon, Apr 6, 2015 at 9:55 AM, Mohit Anchlia <mo...@gmail.com>
wrote:

> Interesting, I see 0 cores in the UI?
>
>
>    - *Cores:* 0 Total, 0 Used
>
>
> On Fri, Apr 3, 2015 at 2:55 PM, Tathagata Das <td...@databricks.com> wrote:
>
>> What does the Spark Standalone UI at port 8080 say about number of cores?
>>
>> On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia <mo...@gmail.com>
>> wrote:
>>
>>> [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process
>>> processor       : 0
>>> processor       : 1
>>> processor       : 2
>>> processor       : 3
>>> processor       : 4
>>> processor       : 5
>>> processor       : 6
>>> processor       : 7
>>>
>>> On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> How many cores are present in the works allocated to the standalone
>>>> cluster spark://ip-10-241-251-232:7077 ?
>>>>
>>>>
>>>> On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia <mo...@gmail.com>
>>>> wrote:
>>>>
>>>>> If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077
>>>>> this seems to work. I don't understand why though because when I
>>>>> give spark://ip-10-241-251-232:7077 application seem to bootstrap
>>>>> successfully, just doesn't create a socket on port 9999?
>>>>>
>>>>>
>>>>> On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <
>>>>> mohitanchlia@gmail.com> wrote:
>>>>>
>>>>>> I checked the ports using netstat and don't see any connections
>>>>>> established on that port. Logs show only this:
>>>>>>
>>>>>> 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
>>>>>> 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with
>>>>>> ID app-20150327135048-0002
>>>>>>
>>>>>> Spark ui shows:
>>>>>>
>>>>>> Running Applications
>>>>>> IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
>>>>>> app-20150327135048-0002
>>>>>> <http://54.69.225.94:8080/app?appId=app-20150327135048-0002>
>>>>>> NetworkWordCount
>>>>>> <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
>>>>>> 13:50:48ec2-userWAITING33 s
>>>>>> Code looks like is being executed:
>>>>>>
>>>>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>>>>>
>>>>>> *public* *static* *void* doWork(String masterUrl){
>>>>>>
>>>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>>>> "NetworkWordCount");
>>>>>>
>>>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>>>> Durations.*seconds*(1));
>>>>>>
>>>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>>>> "localhost", 9999);
>>>>>>
>>>>>> System.*out*.println("Successfully created connection");
>>>>>>
>>>>>> *mapAndReduce*(lines);
>>>>>>
>>>>>>  jssc.start(); // Start the computation
>>>>>>
>>>>>> jssc.awaitTermination(); // Wait for the computation to terminate
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* *static* *void* main(String ...args){
>>>>>>
>>>>>> *doWork*(args[0]);
>>>>>>
>>>>>> }
>>>>>> And output of the java program after submitting the task:
>>>>>>
>>>>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>>>>> Using Spark's default log4j profile:
>>>>>> org/apache/spark/log4j-defaults.properties
>>>>>> 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to:
>>>>>> ec2-user
>>>>>> 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to:
>>>>>> ec2-user
>>>>>> 15/03/27 13:50:46 INFO SecurityManager: SecurityManager:
>>>>>> authentication disabled; ui acls disabled; users with view permissions:
>>>>>> Set(ec2-user); users with modify permissions: Set(ec2-user)
>>>>>> 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
>>>>>> 15/03/27 13:50:46 INFO Remoting: Starting remoting
>>>>>> 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on
>>>>>> addresses
>>>>>> :[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal
>>>>>> :60184]
>>>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service
>>>>>> 'sparkDriver' on port 60184.
>>>>>> 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
>>>>>> 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
>>>>>> 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
>>>>>> /tmp/spark-local-20150327135047-5399
>>>>>> 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity
>>>>>> 3.5 GB
>>>>>> 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
>>>>>> library for your platform... using builtin-java classes where applicable
>>>>>> 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
>>>>>> /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
>>>>>> 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
>>>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
>>>>>> server' on port 57955.
>>>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI'
>>>>>> on port 4040.
>>>>>> 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
>>>>>> http://ip-10-241-251-232.us-west-2.compute.internal:4040
>>>>>> 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
>>>>>> spark://ip-10-241-251-232:7077...
>>>>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to
>>>>>> Spark cluster with app ID app-20150327135048-0002
>>>>>> 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on
>>>>>> 58358
>>>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register
>>>>>> BlockManager
>>>>>> 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block
>>>>>> manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
>>>>>> BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
>>>>>> 58358)
>>>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
>>>>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend
>>>>>> is ready for scheduling beginning after reached
>>>>>> minRegisteredResourcesRatio: 0.0
>>>>>> 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
>>>>>> 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
>>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
>>>>>> 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
>>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
>>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
>>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
>>>>>> StorageLevel(false, false, false, false, 1)
>>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
>>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
>>>>>> org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
>>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
>>>>>> StorageLevel(false, false, false, false, 1)
>>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
>>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
>>>>>> org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
>>>>>> 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO MappedDStream: Storage level =
>>>>>> StorageLevel(false, false, false, false, 1)
>>>>>> 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
>>>>>> 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
>>>>>> org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
>>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Storage level =
>>>>>> StorageLevel(false, false, false, false, 1)
>>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
>>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
>>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Storage level =
>>>>>> StorageLevel(false, false, false, false, 1)
>>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
>>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
>>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
>>>>>> org.apache.spark.streaming.dstream.ForEachDStream@5df91314
>>>>>> 15/03/27 13:50:48 INFO SparkContext: Starting job: start at
>>>>>> WordCount.java:26
>>>>>> 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator
>>>>>> at time 1427478649000
>>>>>> 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at
>>>>>> 1427478649000 ms
>>>>>> 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
>>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
>>>>>> WordCount.java:26)
>>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at
>>>>>> WordCount.java:26) with 20 output partitions (allowLocal=false)
>>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
>>>>>> WordCount.java:26)
>>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage:
>>>>>> List(Stage 0)
>>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
>>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2]
>>>>>> at start at WordCount.java:26), which has no missing parents
>>>>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
>>>>>> curMem=0, maxMem=3771948072
>>>>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as
>>>>>> values in memory (estimated size 2.7 KB, free 3.5 GB)
>>>>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
>>>>>> curMem=2720, maxMem=3771948072
>>>>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored
>>>>>> as bytes in memory (estimated size 1943.0 B, free 3.5 GB)
>>>>>> 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>>>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0
>>>>>> B, free: 3.5 GB)
>>>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
>>>>>> broadcast_0_piece0
>>>>>> 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from
>>>>>> broadcast at DAGScheduler.scala:838
>>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from
>>>>>> Stage 0 (MappedRDD[2] at start at WordCount.java:26)
>>>>>> 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50
>>>>>> tasks
>>>>>> 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time
>>>>>> 1427478649000 ms
>>>>>> 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
>>>>>> 1427478649000 ms.0 from job set of time 1427478649000 ms
>>>>>> 15/03/27 13:50:49 INFO SparkContext: Starting job: print at
>>>>>> WordCount.java:53
>>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
>>>>>> WordCount.java:39)
>>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at
>>>>>> WordCount.java:53) with 1 output partitions (allowLocal=true)
>>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
>>>>>> WordCount.java:53)
>>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage:
>>>>>> List(Stage 2)
>>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
>>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3
>>>>>> (ShuffledRDD[7] at reduceByKey at WordCount.java:46), which has no missing
>>>>>> parents
>>>>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
>>>>>> curMem=4663, maxMem=3771948072
>>>>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as
>>>>>> values in memory (estimated size 2.2 KB, free 3.5 GB)
>>>>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
>>>>>> curMem=6927, maxMem=3771948072
>>>>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored
>>>>>> as bytes in memory (estimated size 1688.0 B, free 3.5 GB)
>>>>>> 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>>>>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0
>>>>>> B, free: 3.5 GB)
>>>>>> 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
>>>>>> broadcast_1_piece0
>>>>>> 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from
>>>>>> broadcast at DAGScheduler.scala:838
>>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from
>>>>>> Stage 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
>>>>>> 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1
>>>>>> tasks
>>>>>> 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time
>>>>>> 1427478650000 ms
>>>>>> 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time
>>>>>> 1427478651000 ms
>>>>>> 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time
>>>>>> 1427478652000 ms
>>>>>> 15/03/27 13:50:53 IN
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Did you run the word count example in Spark local mode or other
>>>>>>> mode, in local mode you have to set Local[n], where n >=2. For other mode,
>>>>>>> make sure available cores larger than 1. Because the receiver inside Spark
>>>>>>> Streaming wraps as a long-running task, which will at least occupy one core.
>>>>>>>
>>>>>>> Besides using lsof -p <pid> or netstat to make sure Spark executor
>>>>>>> backend is connected to the nc process. Also grep the executor's log to see
>>>>>>> if there's log like "Connecting to <host> <port>" and "Connected to <host>
>>>>>>> <port>" which shows that receiver is correctly connected to nc process.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jerry
>>>>>>>
>>>>>>> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>>>>>>
>>>>>>>> What's the best way to troubleshoot inside spark to see why Spark
>>>>>>>> is not connecting to nc on port 9999? I don't see any errors either.
>>>>>>>>
>>>>>>>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <
>>>>>>>> mohitanchlia@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am trying to run the word count example but for some reason it's
>>>>>>>>> not working as expected. I start "nc" server on port 9999 and then submit
>>>>>>>>> the spark job to the cluster. Spark job gets successfully submitting but I
>>>>>>>>> never see any connection from spark getting established. I also tried to
>>>>>>>>> type words on the console where "nc" is listening and waiting on the
>>>>>>>>> prompt, however I don't see any output. I also don't see any errors.
>>>>>>>>>
>>>>>>>>> Here is the conf:
>>>>>>>>>
>>>>>>>>> SparkConf conf = *new*
>>>>>>>>> SparkConf().setMaster(masterUrl).setAppName("NetworkWordCount");
>>>>>>>>>
>>>>>>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>>>>>>> Durations.*seconds*(1));
>>>>>>>>>
>>>>>>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>>>>>>> "localhost", 9999);
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: WordCount example

Posted by Mohit Anchlia <mo...@gmail.com>.
Interesting, I see 0 cores in the UI?


   - *Cores:* 0 Total, 0 Used


On Fri, Apr 3, 2015 at 2:55 PM, Tathagata Das <td...@databricks.com> wrote:

> What does the Spark Standalone UI at port 8080 say about number of cores?
>
> On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia <mo...@gmail.com>
> wrote:
>
>> [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process
>> processor       : 0
>> processor       : 1
>> processor       : 2
>> processor       : 3
>> processor       : 4
>> processor       : 5
>> processor       : 6
>> processor       : 7
>>
>> On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> How many cores are present in the works allocated to the standalone
>>> cluster spark://ip-10-241-251-232:7077 ?
>>>
>>>
>>> On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia <mo...@gmail.com>
>>> wrote:
>>>
>>>> If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077
>>>> this seems to work. I don't understand why though because when I
>>>> give spark://ip-10-241-251-232:7077 application seem to bootstrap
>>>> successfully, just doesn't create a socket on port 9999?
>>>>
>>>>
>>>> On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <mohitanchlia@gmail.com
>>>> > wrote:
>>>>
>>>>> I checked the ports using netstat and don't see any connections
>>>>> established on that port. Logs show only this:
>>>>>
>>>>> 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
>>>>> 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
>>>>> app-20150327135048-0002
>>>>>
>>>>> Spark ui shows:
>>>>>
>>>>> Running Applications
>>>>> IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
>>>>> app-20150327135048-0002
>>>>> <http://54.69.225.94:8080/app?appId=app-20150327135048-0002>
>>>>> NetworkWordCount
>>>>> <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
>>>>> 13:50:48ec2-userWAITING33 s
>>>>> Code looks like is being executed:
>>>>>
>>>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>>>>
>>>>> *public* *static* *void* doWork(String masterUrl){
>>>>>
>>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>>> "NetworkWordCount");
>>>>>
>>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>>> Durations.*seconds*(1));
>>>>>
>>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>>> "localhost", 9999);
>>>>>
>>>>> System.*out*.println("Successfully created connection");
>>>>>
>>>>> *mapAndReduce*(lines);
>>>>>
>>>>>  jssc.start(); // Start the computation
>>>>>
>>>>> jssc.awaitTermination(); // Wait for the computation to terminate
>>>>>
>>>>> }
>>>>>
>>>>> *public* *static* *void* main(String ...args){
>>>>>
>>>>> *doWork*(args[0]);
>>>>>
>>>>> }
>>>>> And output of the java program after submitting the task:
>>>>>
>>>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>>>> Using Spark's default log4j profile:
>>>>> org/apache/spark/log4j-defaults.properties
>>>>> 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
>>>>> 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to:
>>>>> ec2-user
>>>>> 15/03/27 13:50:46 INFO SecurityManager: SecurityManager:
>>>>> authentication disabled; ui acls disabled; users with view permissions:
>>>>> Set(ec2-user); users with modify permissions: Set(ec2-user)
>>>>> 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
>>>>> 15/03/27 13:50:46 INFO Remoting: Starting remoting
>>>>> 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on
>>>>> addresses
>>>>> :[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal
>>>>> :60184]
>>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service
>>>>> 'sparkDriver' on port 60184.
>>>>> 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
>>>>> 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
>>>>> 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
>>>>> /tmp/spark-local-20150327135047-5399
>>>>> 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity
>>>>> 3.5 GB
>>>>> 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
>>>>> library for your platform... using builtin-java classes where applicable
>>>>> 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
>>>>> /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
>>>>> 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
>>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
>>>>> server' on port 57955.
>>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI'
>>>>> on port 4040.
>>>>> 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
>>>>> http://ip-10-241-251-232.us-west-2.compute.internal:4040
>>>>> 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
>>>>> spark://ip-10-241-251-232:7077...
>>>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
>>>>> cluster with app ID app-20150327135048-0002
>>>>> 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on
>>>>> 58358
>>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register
>>>>> BlockManager
>>>>> 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block
>>>>> manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
>>>>> BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
>>>>> 58358)
>>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
>>>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend
>>>>> is ready for scheduling beginning after reached
>>>>> minRegisteredResourcesRatio: 0.0
>>>>> 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
>>>>> 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
>>>>> 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
>>>>> StorageLevel(false, false, false, false, 1)
>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
>>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
>>>>> org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
>>>>> StorageLevel(false, false, false, false, 1)
>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
>>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
>>>>> org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
>>>>> 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
>>>>> 15/03/27 13:50:48 INFO MappedDStream: Storage level =
>>>>> StorageLevel(false, false, false, false, 1)
>>>>> 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
>>>>> 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
>>>>> 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
>>>>> org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Storage level =
>>>>> StorageLevel(false, false, false, false, 1)
>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
>>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Storage level =
>>>>> StorageLevel(false, false, false, false, 1)
>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
>>>>> 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
>>>>> org.apache.spark.streaming.dstream.ForEachDStream@5df91314
>>>>> 15/03/27 13:50:48 INFO SparkContext: Starting job: start at
>>>>> WordCount.java:26
>>>>> 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator
>>>>> at time 1427478649000
>>>>> 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at
>>>>> 1427478649000 ms
>>>>> 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
>>>>> WordCount.java:26)
>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at
>>>>> WordCount.java:26) with 20 output partitions (allowLocal=false)
>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
>>>>> WordCount.java:26)
>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage:
>>>>> List(Stage 0)
>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2]
>>>>> at start at WordCount.java:26), which has no missing parents
>>>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
>>>>> curMem=0, maxMem=3771948072
>>>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values
>>>>> in memory (estimated size 2.7 KB, free 3.5 GB)
>>>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
>>>>> curMem=2720, maxMem=3771948072
>>>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as
>>>>> bytes in memory (estimated size 1943.0 B, free 3.5 GB)
>>>>> 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0
>>>>> B, free: 3.5 GB)
>>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
>>>>> broadcast_0_piece0
>>>>> 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from
>>>>> broadcast at DAGScheduler.scala:838
>>>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from
>>>>> Stage 0 (MappedRDD[2] at start at WordCount.java:26)
>>>>> 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50
>>>>> tasks
>>>>> 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000
>>>>> ms
>>>>> 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
>>>>> 1427478649000 ms.0 from job set of time 1427478649000 ms
>>>>> 15/03/27 13:50:49 INFO SparkContext: Starting job: print at
>>>>> WordCount.java:53
>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
>>>>> WordCount.java:39)
>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at
>>>>> WordCount.java:53) with 1 output partitions (allowLocal=true)
>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
>>>>> WordCount.java:53)
>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage:
>>>>> List(Stage 2)
>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3
>>>>> (ShuffledRDD[7] at reduceByKey at WordCount.java:46), which has no missing
>>>>> parents
>>>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
>>>>> curMem=4663, maxMem=3771948072
>>>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values
>>>>> in memory (estimated size 2.2 KB, free 3.5 GB)
>>>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
>>>>> curMem=6927, maxMem=3771948072
>>>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as
>>>>> bytes in memory (estimated size 1688.0 B, free 3.5 GB)
>>>>> 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>>>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0
>>>>> B, free: 3.5 GB)
>>>>> 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
>>>>> broadcast_1_piece0
>>>>> 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from
>>>>> broadcast at DAGScheduler.scala:838
>>>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from
>>>>> Stage 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
>>>>> 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1
>>>>> tasks
>>>>> 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000
>>>>> ms
>>>>> 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000
>>>>> ms
>>>>> 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000
>>>>> ms
>>>>> 15/03/27 13:50:53 IN
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Did you run the word count example in Spark local mode or other mode,
>>>>>> in local mode you have to set Local[n], where n >=2. For other mode, make
>>>>>> sure available cores larger than 1. Because the receiver inside Spark
>>>>>> Streaming wraps as a long-running task, which will at least occupy one core.
>>>>>>
>>>>>> Besides using lsof -p <pid> or netstat to make sure Spark executor
>>>>>> backend is connected to the nc process. Also grep the executor's log to see
>>>>>> if there's log like "Connecting to <host> <port>" and "Connected to <host>
>>>>>> <port>" which shows that receiver is correctly connected to nc process.
>>>>>>
>>>>>> Thanks
>>>>>> Jerry
>>>>>>
>>>>>> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>>>>>
>>>>>>> What's the best way to troubleshoot inside spark to see why Spark is
>>>>>>> not connecting to nc on port 9999? I don't see any errors either.
>>>>>>>
>>>>>>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <
>>>>>>> mohitanchlia@gmail.com> wrote:
>>>>>>>
>>>>>>>> I am trying to run the word count example but for some reason it's
>>>>>>>> not working as expected. I start "nc" server on port 9999 and then submit
>>>>>>>> the spark job to the cluster. Spark job gets successfully submitting but I
>>>>>>>> never see any connection from spark getting established. I also tried to
>>>>>>>> type words on the console where "nc" is listening and waiting on the
>>>>>>>> prompt, however I don't see any output. I also don't see any errors.
>>>>>>>>
>>>>>>>> Here is the conf:
>>>>>>>>
>>>>>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>>>>>> "NetworkWordCount");
>>>>>>>>
>>>>>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>>>>>> Durations.*seconds*(1));
>>>>>>>>
>>>>>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>>>>>> "localhost", 9999);
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: WordCount example

Posted by Tathagata Das <td...@databricks.com>.
What does the Spark Standalone UI at port 8080 say about number of cores?

On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia <mo...@gmail.com>
wrote:

> [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process
> processor       : 0
> processor       : 1
> processor       : 2
> processor       : 3
> processor       : 4
> processor       : 5
> processor       : 6
> processor       : 7
>
> On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das <td...@databricks.com> wrote:
>
>> How many cores are present in the works allocated to the standalone
>> cluster spark://ip-10-241-251-232:7077 ?
>>
>>
>> On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia <mo...@gmail.com>
>> wrote:
>>
>>> If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this
>>> seems to work. I don't understand why though because when I
>>> give spark://ip-10-241-251-232:7077 application seem to bootstrap
>>> successfully, just doesn't create a socket on port 9999?
>>>
>>>
>>> On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <mo...@gmail.com>
>>> wrote:
>>>
>>>> I checked the ports using netstat and don't see any connections
>>>> established on that port. Logs show only this:
>>>>
>>>> 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
>>>> 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
>>>> app-20150327135048-0002
>>>>
>>>> Spark ui shows:
>>>>
>>>> Running Applications
>>>> IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
>>>> app-20150327135048-0002
>>>> <http://54.69.225.94:8080/app?appId=app-20150327135048-0002>
>>>> NetworkWordCount
>>>> <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
>>>> 13:50:48ec2-userWAITING33 s
>>>> Code looks like is being executed:
>>>>
>>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>>>
>>>> *public* *static* *void* doWork(String masterUrl){
>>>>
>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>> "NetworkWordCount");
>>>>
>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>> Durations.*seconds*(1));
>>>>
>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>> "localhost", 9999);
>>>>
>>>> System.*out*.println("Successfully created connection");
>>>>
>>>> *mapAndReduce*(lines);
>>>>
>>>>  jssc.start(); // Start the computation
>>>>
>>>> jssc.awaitTermination(); // Wait for the computation to terminate
>>>>
>>>> }
>>>>
>>>> *public* *static* *void* main(String ...args){
>>>>
>>>> *doWork*(args[0]);
>>>>
>>>> }
>>>> And output of the java program after submitting the task:
>>>>
>>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>>> Using Spark's default log4j profile:
>>>> org/apache/spark/log4j-defaults.properties
>>>> 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
>>>> 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to:
>>>> ec2-user
>>>> 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
>>>> disabled; ui acls disabled; users with view permissions: Set(ec2-user);
>>>> users with modify permissions: Set(ec2-user)
>>>> 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
>>>> 15/03/27 13:50:46 INFO Remoting: Starting remoting
>>>> 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on
>>>> addresses
>>>> :[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal
>>>> :60184]
>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service
>>>> 'sparkDriver' on port 60184.
>>>> 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
>>>> 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
>>>> 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
>>>> /tmp/spark-local-20150327135047-5399
>>>> 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity
>>>> 3.5 GB
>>>> 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
>>>> library for your platform... using builtin-java classes where applicable
>>>> 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
>>>> /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
>>>> 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
>>>> server' on port 57955.
>>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
>>>> port 4040.
>>>> 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
>>>> http://ip-10-241-251-232.us-west-2.compute.internal:4040
>>>> 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
>>>> spark://ip-10-241-251-232:7077...
>>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
>>>> cluster with app ID app-20150327135048-0002
>>>> 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on
>>>> 58358
>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register
>>>> BlockManager
>>>> 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block
>>>> manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
>>>> BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
>>>> 58358)
>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
>>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
>>>> ready for scheduling beginning after reached minRegisteredResourcesRatio:
>>>> 0.0
>>>> 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
>>>> 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
>>>> 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
>>>> 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
>>>> 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
>>>> StorageLevel(false, false, false, false, 1)
>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
>>>> 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
>>>> org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
>>>> StorageLevel(false, false, false, false, 1)
>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
>>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
>>>> org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
>>>> 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
>>>> 15/03/27 13:50:48 INFO MappedDStream: Storage level =
>>>> StorageLevel(false, false, false, false, 1)
>>>> 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
>>>> 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
>>>> 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
>>>> org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Storage level =
>>>> StorageLevel(false, false, false, false, 1)
>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
>>>> 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
>>>> org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
>>>> 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
>>>> 15/03/27 13:50:48 INFO ForEachDStream: Storage level =
>>>> StorageLevel(false, false, false, false, 1)
>>>> 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
>>>> 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
>>>> 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
>>>> org.apache.spark.streaming.dstream.ForEachDStream@5df91314
>>>> 15/03/27 13:50:48 INFO SparkContext: Starting job: start at
>>>> WordCount.java:26
>>>> 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator
>>>> at time 1427478649000
>>>> 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at
>>>> 1427478649000 ms
>>>> 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
>>>> 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
>>>> WordCount.java:26)
>>>> 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at
>>>> WordCount.java:26) with 20 output partitions (allowLocal=false)
>>>> 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
>>>> WordCount.java:26)
>>>> 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage: List(Stage
>>>> 0)
>>>> 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
>>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2]
>>>> at start at WordCount.java:26), which has no missing parents
>>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
>>>> curMem=0, maxMem=3771948072
>>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values
>>>> in memory (estimated size 2.7 KB, free 3.5 GB)
>>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
>>>> curMem=2720, maxMem=3771948072
>>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as
>>>> bytes in memory (estimated size 1943.0 B, free 3.5 GB)
>>>> 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0
>>>> B, free: 3.5 GB)
>>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
>>>> broadcast_0_piece0
>>>> 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from broadcast
>>>> at DAGScheduler.scala:838
>>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from
>>>> Stage 0 (MappedRDD[2] at start at WordCount.java:26)
>>>> 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50
>>>> tasks
>>>> 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000
>>>> ms
>>>> 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
>>>> 1427478649000 ms.0 from job set of time 1427478649000 ms
>>>> 15/03/27 13:50:49 INFO SparkContext: Starting job: print at
>>>> WordCount.java:53
>>>> 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
>>>> WordCount.java:39)
>>>> 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at
>>>> WordCount.java:53) with 1 output partitions (allowLocal=true)
>>>> 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
>>>> WordCount.java:53)
>>>> 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage: List(Stage
>>>> 2)
>>>> 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
>>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[7]
>>>> at reduceByKey at WordCount.java:46), which has no missing parents
>>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
>>>> curMem=4663, maxMem=3771948072
>>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values
>>>> in memory (estimated size 2.2 KB, free 3.5 GB)
>>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
>>>> curMem=6927, maxMem=3771948072
>>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as
>>>> bytes in memory (estimated size 1688.0 B, free 3.5 GB)
>>>> 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0
>>>> B, free: 3.5 GB)
>>>> 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
>>>> broadcast_1_piece0
>>>> 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from broadcast
>>>> at DAGScheduler.scala:838
>>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from
>>>> Stage 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
>>>> 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1
>>>> tasks
>>>> 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000
>>>> ms
>>>> 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000
>>>> ms
>>>> 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000
>>>> ms
>>>> 15/03/27 13:50:53 IN
>>>>
>>>>
>>>>
>>>> On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Did you run the word count example in Spark local mode or other mode,
>>>>> in local mode you have to set Local[n], where n >=2. For other mode, make
>>>>> sure available cores larger than 1. Because the receiver inside Spark
>>>>> Streaming wraps as a long-running task, which will at least occupy one core.
>>>>>
>>>>> Besides using lsof -p <pid> or netstat to make sure Spark executor
>>>>> backend is connected to the nc process. Also grep the executor's log to see
>>>>> if there's log like "Connecting to <host> <port>" and "Connected to <host>
>>>>> <port>" which shows that receiver is correctly connected to nc process.
>>>>>
>>>>> Thanks
>>>>> Jerry
>>>>>
>>>>> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>>>>
>>>>>> What's the best way to troubleshoot inside spark to see why Spark is
>>>>>> not connecting to nc on port 9999? I don't see any errors either.
>>>>>>
>>>>>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <
>>>>>> mohitanchlia@gmail.com> wrote:
>>>>>>
>>>>>>> I am trying to run the word count example but for some reason it's
>>>>>>> not working as expected. I start "nc" server on port 9999 and then submit
>>>>>>> the spark job to the cluster. Spark job gets successfully submitting but I
>>>>>>> never see any connection from spark getting established. I also tried to
>>>>>>> type words on the console where "nc" is listening and waiting on the
>>>>>>> prompt, however I don't see any output. I also don't see any errors.
>>>>>>>
>>>>>>> Here is the conf:
>>>>>>>
>>>>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>>>>> "NetworkWordCount");
>>>>>>>
>>>>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>>>>> Durations.*seconds*(1));
>>>>>>>
>>>>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>>>>> "localhost", 9999);
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: WordCount example

Posted by Mohit Anchlia <mo...@gmail.com>.
[ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process
processor       : 0
processor       : 1
processor       : 2
processor       : 3
processor       : 4
processor       : 5
processor       : 6
processor       : 7

On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das <td...@databricks.com> wrote:

> How many cores are present in the works allocated to the standalone
> cluster spark://ip-10-241-251-232:7077 ?
>
>
> On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia <mo...@gmail.com>
> wrote:
>
>> If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this
>> seems to work. I don't understand why though because when I
>> give spark://ip-10-241-251-232:7077 application seem to bootstrap
>> successfully, just doesn't create a socket on port 9999?
>>
>>
>> On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <mo...@gmail.com>
>> wrote:
>>
>>> I checked the ports using netstat and don't see any connections
>>> established on that port. Logs show only this:
>>>
>>> 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
>>> 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
>>> app-20150327135048-0002
>>>
>>> Spark ui shows:
>>>
>>> Running Applications
>>> IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
>>> app-20150327135048-0002
>>> <http://54.69.225.94:8080/app?appId=app-20150327135048-0002>
>>> NetworkWordCount
>>> <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
>>> 13:50:48ec2-userWAITING33 s
>>> Code looks like is being executed:
>>>
>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>>
>>> *public* *static* *void* doWork(String masterUrl){
>>>
>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>> "NetworkWordCount");
>>>
>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>> Durations.*seconds*(1));
>>>
>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>> "localhost", 9999);
>>>
>>> System.*out*.println("Successfully created connection");
>>>
>>> *mapAndReduce*(lines);
>>>
>>>  jssc.start(); // Start the computation
>>>
>>> jssc.awaitTermination(); // Wait for the computation to terminate
>>>
>>> }
>>>
>>> *public* *static* *void* main(String ...args){
>>>
>>> *doWork*(args[0]);
>>>
>>> }
>>> And output of the java program after submitting the task:
>>>
>>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>> Using Spark's default log4j profile:
>>> org/apache/spark/log4j-defaults.properties
>>> 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
>>> 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user
>>> 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
>>> disabled; ui acls disabled; users with view permissions: Set(ec2-user);
>>> users with modify permissions: Set(ec2-user)
>>> 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
>>> 15/03/27 13:50:46 INFO Remoting: Starting remoting
>>> 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on
>>> addresses
>>> :[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal
>>> :60184]
>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver'
>>> on port 60184.
>>> 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
>>> 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
>>> 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
>>> /tmp/spark-local-20150327135047-5399
>>> 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity
>>> 3.5 GB
>>> 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
>>> /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
>>> 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
>>> server' on port 57955.
>>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
>>> port 4040.
>>> 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
>>> http://ip-10-241-251-232.us-west-2.compute.internal:4040
>>> 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
>>> spark://ip-10-241-251-232:7077...
>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
>>> cluster with app ID app-20150327135048-0002
>>> 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358
>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register
>>> BlockManager
>>> 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block
>>> manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
>>> BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
>>> 58358)
>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
>>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
>>> ready for scheduling beginning after reached minRegisteredResourcesRatio:
>>> 0.0
>>> 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
>>> 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
>>> 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
>>> 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
>>> 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
>>> 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
>>> 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
>>> 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
>>> StorageLevel(false, false, false, false, 1)
>>> 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
>>> 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
>>> 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
>>> org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
>>> StorageLevel(false, false, false, false, 1)
>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
>>> 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
>>> org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
>>> 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
>>> 15/03/27 13:50:48 INFO MappedDStream: Storage level =
>>> StorageLevel(false, false, false, false, 1)
>>> 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
>>> 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
>>> 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
>>> org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
>>> 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
>>> 15/03/27 13:50:48 INFO ShuffledDStream: Storage level =
>>> StorageLevel(false, false, false, false, 1)
>>> 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
>>> 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
>>> 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
>>> org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
>>> 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
>>> 15/03/27 13:50:48 INFO ForEachDStream: Storage level =
>>> StorageLevel(false, false, false, false, 1)
>>> 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
>>> 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
>>> 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
>>> org.apache.spark.streaming.dstream.ForEachDStream@5df91314
>>> 15/03/27 13:50:48 INFO SparkContext: Starting job: start at
>>> WordCount.java:26
>>> 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator at
>>> time 1427478649000
>>> 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at
>>> 1427478649000 ms
>>> 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
>>> 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
>>> WordCount.java:26)
>>> 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at
>>> WordCount.java:26) with 20 output partitions (allowLocal=false)
>>> 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
>>> WordCount.java:26)
>>> 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage: List(Stage
>>> 0)
>>> 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at
>>> start at WordCount.java:26), which has no missing parents
>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
>>> curMem=0, maxMem=3771948072
>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values
>>> in memory (estimated size 2.7 KB, free 3.5 GB)
>>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
>>> curMem=2720, maxMem=3771948072
>>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as
>>> bytes in memory (estimated size 1943.0 B, free 3.5 GB)
>>> 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0
>>> B, free: 3.5 GB)
>>> 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
>>> broadcast_0_piece0
>>> 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from broadcast
>>> at DAGScheduler.scala:838
>>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from
>>> Stage 0 (MappedRDD[2] at start at WordCount.java:26)
>>> 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50
>>> tasks
>>> 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000 ms
>>> 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
>>> 1427478649000 ms.0 from job set of time 1427478649000 ms
>>> 15/03/27 13:50:49 INFO SparkContext: Starting job: print at
>>> WordCount.java:53
>>> 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
>>> WordCount.java:39)
>>> 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at
>>> WordCount.java:53) with 1 output partitions (allowLocal=true)
>>> 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
>>> WordCount.java:53)
>>> 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage: List(Stage
>>> 2)
>>> 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[7]
>>> at reduceByKey at WordCount.java:46), which has no missing parents
>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
>>> curMem=4663, maxMem=3771948072
>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values
>>> in memory (estimated size 2.2 KB, free 3.5 GB)
>>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
>>> curMem=6927, maxMem=3771948072
>>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as
>>> bytes in memory (estimated size 1688.0 B, free 3.5 GB)
>>> 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0
>>> B, free: 3.5 GB)
>>> 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
>>> broadcast_1_piece0
>>> 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from broadcast
>>> at DAGScheduler.scala:838
>>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from
>>> Stage 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
>>> 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1
>>> tasks
>>> 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000 ms
>>> 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000 ms
>>> 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000 ms
>>> 15/03/27 13:50:53 IN
>>>
>>>
>>>
>>> On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Did you run the word count example in Spark local mode or other mode,
>>>> in local mode you have to set Local[n], where n >=2. For other mode, make
>>>> sure available cores larger than 1. Because the receiver inside Spark
>>>> Streaming wraps as a long-running task, which will at least occupy one core.
>>>>
>>>> Besides using lsof -p <pid> or netstat to make sure Spark executor
>>>> backend is connected to the nc process. Also grep the executor's log to see
>>>> if there's log like "Connecting to <host> <port>" and "Connected to <host>
>>>> <port>" which shows that receiver is correctly connected to nc process.
>>>>
>>>> Thanks
>>>> Jerry
>>>>
>>>> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>>>
>>>>> What's the best way to troubleshoot inside spark to see why Spark is
>>>>> not connecting to nc on port 9999? I don't see any errors either.
>>>>>
>>>>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mohitanchlia@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> I am trying to run the word count example but for some reason it's
>>>>>> not working as expected. I start "nc" server on port 9999 and then submit
>>>>>> the spark job to the cluster. Spark job gets successfully submitting but I
>>>>>> never see any connection from spark getting established. I also tried to
>>>>>> type words on the console where "nc" is listening and waiting on the
>>>>>> prompt, however I don't see any output. I also don't see any errors.
>>>>>>
>>>>>> Here is the conf:
>>>>>>
>>>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>>>> "NetworkWordCount");
>>>>>>
>>>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>>>> Durations.*seconds*(1));
>>>>>>
>>>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>>>> "localhost", 9999);
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: WordCount example

Posted by Tathagata Das <td...@databricks.com>.
How many cores are present in the works allocated to the standalone cluster
spark://ip-10-241-251-232:7077 ?


On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia <mo...@gmail.com>
wrote:

> If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this
> seems to work. I don't understand why though because when I
> give spark://ip-10-241-251-232:7077 application seem to bootstrap
> successfully, just doesn't create a socket on port 9999?
>
>
> On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <mo...@gmail.com>
> wrote:
>
>> I checked the ports using netstat and don't see any connections
>> established on that port. Logs show only this:
>>
>> 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
>> 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
>> app-20150327135048-0002
>>
>> Spark ui shows:
>>
>> Running Applications
>> IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
>> app-20150327135048-0002
>> <http://54.69.225.94:8080/app?appId=app-20150327135048-0002>
>> NetworkWordCount
>> <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
>> 13:50:48ec2-userWAITING33 s
>> Code looks like is being executed:
>>
>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>>
>> *public* *static* *void* doWork(String masterUrl){
>>
>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>> "NetworkWordCount");
>>
>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
>> *seconds*(1));
>>
>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>> "localhost", 9999);
>>
>> System.*out*.println("Successfully created connection");
>>
>> *mapAndReduce*(lines);
>>
>>  jssc.start(); // Start the computation
>>
>> jssc.awaitTermination(); // Wait for the computation to terminate
>>
>> }
>>
>> *public* *static* *void* main(String ...args){
>>
>> *doWork*(args[0]);
>>
>> }
>> And output of the java program after submitting the task:
>>
>> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
>> 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user
>> 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(ec2-user);
>> users with modify permissions: Set(ec2-user)
>> 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
>> 15/03/27 13:50:46 INFO Remoting: Starting remoting
>> 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal
>> :60184]
>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver'
>> on port 60184.
>> 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
>> 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
>> 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
>> /tmp/spark-local-20150327135047-5399
>> 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5
>> GB
>> 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
>> /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
>> 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
>> server' on port 57955.
>> 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
>> port 4040.
>> 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
>> http://ip-10-241-251-232.us-west-2.compute.internal:4040
>> 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
>> spark://ip-10-241-251-232:7077...
>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
>> cluster with app ID app-20150327135048-0002
>> 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358
>> 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager
>> 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager
>> ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
>> BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
>> 58358)
>> 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
>> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
>> ready for scheduling beginning after reached minRegisteredResourcesRatio:
>> 0.0
>> 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
>> 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
>> 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
>> 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
>> 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
>> 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
>> 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
>> 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
>> 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
>> 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
>> 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
>> 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
>> 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
>> 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
>> 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
>> 15/03/27 13:50:48 INFO MappedDStream: Storage level = StorageLevel(false,
>> false, false, false, 1)
>> 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
>> 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
>> 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
>> 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
>> 15/03/27 13:50:48 INFO ShuffledDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
>> 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
>> 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
>> 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
>> 15/03/27 13:50:48 INFO ForEachDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
>> 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
>> 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ForEachDStream@5df91314
>> 15/03/27 13:50:48 INFO SparkContext: Starting job: start at
>> WordCount.java:26
>> 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator at
>> time 1427478649000
>> 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at
>> 1427478649000 ms
>> 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
>> 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
>> WordCount.java:26)
>> 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at
>> WordCount.java:26) with 20 output partitions (allowLocal=false)
>> 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
>> WordCount.java:26)
>> 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage: List(Stage 0)
>> 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at
>> start at WordCount.java:26), which has no missing parents
>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
>> curMem=0, maxMem=3771948072
>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values in
>> memory (estimated size 2.7 KB, free 3.5 GB)
>> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
>> curMem=2720, maxMem=3771948072
>> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as
>> bytes in memory (estimated size 1943.0 B, free 3.5 GB)
>> 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0
>> B, free: 3.5 GB)
>> 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
>> broadcast_0_piece0
>> 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from broadcast
>> at DAGScheduler.scala:838
>> 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from
>> Stage 0 (MappedRDD[2] at start at WordCount.java:26)
>> 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50
>> tasks
>> 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000 ms
>> 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
>> 1427478649000 ms.0 from job set of time 1427478649000 ms
>> 15/03/27 13:50:49 INFO SparkContext: Starting job: print at
>> WordCount.java:53
>> 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
>> WordCount.java:39)
>> 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at
>> WordCount.java:53) with 1 output partitions (allowLocal=true)
>> 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
>> WordCount.java:53)
>> 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage: List(Stage 2)
>> 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[7]
>> at reduceByKey at WordCount.java:46), which has no missing parents
>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
>> curMem=4663, maxMem=3771948072
>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values in
>> memory (estimated size 2.2 KB, free 3.5 GB)
>> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
>> curMem=6927, maxMem=3771948072
>> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as
>> bytes in memory (estimated size 1688.0 B, free 3.5 GB)
>> 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0
>> B, free: 3.5 GB)
>> 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
>> broadcast_1_piece0
>> 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from broadcast
>> at DAGScheduler.scala:838
>> 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from
>> Stage 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
>> 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
>> 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000 ms
>> 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000 ms
>> 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000 ms
>> 15/03/27 13:50:53 IN
>>
>>
>>
>> On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Did you run the word count example in Spark local mode or other mode, in
>>> local mode you have to set Local[n], where n >=2. For other mode, make sure
>>> available cores larger than 1. Because the receiver inside Spark Streaming
>>> wraps as a long-running task, which will at least occupy one core.
>>>
>>> Besides using lsof -p <pid> or netstat to make sure Spark executor
>>> backend is connected to the nc process. Also grep the executor's log to see
>>> if there's log like "Connecting to <host> <port>" and "Connected to <host>
>>> <port>" which shows that receiver is correctly connected to nc process.
>>>
>>> Thanks
>>> Jerry
>>>
>>> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>>
>>>> What's the best way to troubleshoot inside spark to see why Spark is
>>>> not connecting to nc on port 9999? I don't see any errors either.
>>>>
>>>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am trying to run the word count example but for some reason it's not
>>>>> working as expected. I start "nc" server on port 9999 and then submit the
>>>>> spark job to the cluster. Spark job gets successfully submitting but I
>>>>> never see any connection from spark getting established. I also tried to
>>>>> type words on the console where "nc" is listening and waiting on the
>>>>> prompt, however I don't see any output. I also don't see any errors.
>>>>>
>>>>> Here is the conf:
>>>>>
>>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>>> "NetworkWordCount");
>>>>>
>>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>>> Durations.*seconds*(1));
>>>>>
>>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>>> "localhost", 9999);
>>>>>
>>>>
>>>>
>>>
>>
>

Re: WordCount example

Posted by Mohit Anchlia <mo...@gmail.com>.
If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this
seems to work. I don't understand why though because when I
give spark://ip-10-241-251-232:7077 application seem to bootstrap
successfully, just doesn't create a socket on port 9999?


On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <mo...@gmail.com>
wrote:

> I checked the ports using netstat and don't see any connections
> established on that port. Logs show only this:
>
> 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
> 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
> app-20150327135048-0002
>
> Spark ui shows:
>
> Running Applications
> IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
> app-20150327135048-0002
> <http://54.69.225.94:8080/app?appId=app-20150327135048-0002>
> NetworkWordCount
> <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
> 13:50:48ec2-userWAITING33 s
> Code looks like is being executed:
>
> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>
> *public* *static* *void* doWork(String masterUrl){
>
> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
> "NetworkWordCount");
>
> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
> *seconds*(1));
>
> JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost",
> 9999);
>
> System.*out*.println("Successfully created connection");
>
> *mapAndReduce*(lines);
>
>  jssc.start(); // Start the computation
>
> jssc.awaitTermination(); // Wait for the computation to terminate
>
> }
>
> *public* *static* *void* main(String ...args){
>
> *doWork*(args[0]);
>
> }
> And output of the java program after submitting the task:
>
> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
> 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user
> 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(ec2-user);
> users with modify permissions: Set(ec2-user)
> 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
> 15/03/27 13:50:46 INFO Remoting: Starting remoting
> 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal
> :60184]
> 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver'
> on port 60184.
> 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
> 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
> 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
> /tmp/spark-local-20150327135047-5399
> 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5
> GB
> 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
> 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
> 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
> server' on port 57955.
> 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
> http://ip-10-241-251-232.us-west-2.compute.internal:4040
> 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
> spark://ip-10-241-251-232:7077...
> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
> cluster with app ID app-20150327135048-0002
> 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358
> 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager
> 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager
> ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
> BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
> 58358)
> 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
> ready for scheduling beginning after reached minRegisteredResourcesRatio:
> 0.0
> 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
> 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
> org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
> 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
> 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO MappedDStream: Storage level = StorageLevel(false,
> false, false, false, 1)
> 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
> 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO ShuffledDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
> 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO ForEachDStream: Storage level = StorageLevel(false,
> false, false, false, 1)
> 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@5df91314
> 15/03/27 13:50:48 INFO SparkContext: Starting job: start at
> WordCount.java:26
> 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator at
> time 1427478649000
> 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at 1427478649000
> ms
> 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
> 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
> WordCount.java:26)
> 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at
> WordCount.java:26) with 20 output partitions (allowLocal=false)
> 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
> WordCount.java:26)
> 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage: List(Stage 0)
> 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
> 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at
> start at WordCount.java:26), which has no missing parents
> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
> curMem=0, maxMem=3771948072
> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 2.7 KB, free 3.5 GB)
> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
> curMem=2720, maxMem=3771948072
> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes in memory (estimated size 1943.0 B, free 3.5 GB)
> 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0
> B, free: 3.5 GB)
> 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
> broadcast_0_piece0
> 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from broadcast at
> DAGScheduler.scala:838
> 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from
> Stage 0 (MappedRDD[2] at start at WordCount.java:26)
> 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50 tasks
> 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000 ms
> 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
> 1427478649000 ms.0 from job set of time 1427478649000 ms
> 15/03/27 13:50:49 INFO SparkContext: Starting job: print at
> WordCount.java:53
> 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
> WordCount.java:39)
> 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at
> WordCount.java:53) with 1 output partitions (allowLocal=true)
> 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
> WordCount.java:53)
> 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage: List(Stage 2)
> 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
> 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[7] at
> reduceByKey at WordCount.java:46), which has no missing parents
> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
> curMem=4663, maxMem=3771948072
> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 2.2 KB, free 3.5 GB)
> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
> curMem=6927, maxMem=3771948072
> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as
> bytes in memory (estimated size 1688.0 B, free 3.5 GB)
> 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0
> B, free: 3.5 GB)
> 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
> broadcast_1_piece0
> 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
> 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from Stage
> 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
> 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
> 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000 ms
> 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000 ms
> 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000 ms
> 15/03/27 13:50:53 IN
>
>
>
> On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Did you run the word count example in Spark local mode or other mode, in
>> local mode you have to set Local[n], where n >=2. For other mode, make sure
>> available cores larger than 1. Because the receiver inside Spark Streaming
>> wraps as a long-running task, which will at least occupy one core.
>>
>> Besides using lsof -p <pid> or netstat to make sure Spark executor
>> backend is connected to the nc process. Also grep the executor's log to see
>> if there's log like "Connecting to <host> <port>" and "Connected to <host>
>> <port>" which shows that receiver is correctly connected to nc process.
>>
>> Thanks
>> Jerry
>>
>> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>
>>> What's the best way to troubleshoot inside spark to see why Spark is not
>>> connecting to nc on port 9999? I don't see any errors either.
>>>
>>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mo...@gmail.com>
>>> wrote:
>>>
>>>> I am trying to run the word count example but for some reason it's not
>>>> working as expected. I start "nc" server on port 9999 and then submit the
>>>> spark job to the cluster. Spark job gets successfully submitting but I
>>>> never see any connection from spark getting established. I also tried to
>>>> type words on the console where "nc" is listening and waiting on the
>>>> prompt, however I don't see any output. I also don't see any errors.
>>>>
>>>> Here is the conf:
>>>>
>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>> "NetworkWordCount");
>>>>
>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>> Durations.*seconds*(1));
>>>>
>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>> "localhost", 9999);
>>>>
>>>
>>>
>>
>

Re: WordCount example

Posted by Mohit Anchlia <mo...@gmail.com>.
I tried to file a bug in git repo however I don't see a link to "open
issues"

On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <mo...@gmail.com>
wrote:

> I checked the ports using netstat and don't see any connections
> established on that port. Logs show only this:
>
> 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
> 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
> app-20150327135048-0002
>
> Spark ui shows:
>
> Running Applications
> IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
> app-20150327135048-0002
> <http://54.69.225.94:8080/app?appId=app-20150327135048-0002>
> NetworkWordCount
> <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
> 13:50:48ec2-userWAITING33 s
> Code looks like is being executed:
>
> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
>
> *public* *static* *void* doWork(String masterUrl){
>
> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
> "NetworkWordCount");
>
> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
> *seconds*(1));
>
> JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost",
> 9999);
>
> System.*out*.println("Successfully created connection");
>
> *mapAndReduce*(lines);
>
>  jssc.start(); // Start the computation
>
> jssc.awaitTermination(); // Wait for the computation to terminate
>
> }
>
> *public* *static* *void* main(String ...args){
>
> *doWork*(args[0]);
>
> }
> And output of the java program after submitting the task:
>
> java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
> 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user
> 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(ec2-user);
> users with modify permissions: Set(ec2-user)
> 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
> 15/03/27 13:50:46 INFO Remoting: Starting remoting
> 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal
> :60184]
> 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver'
> on port 60184.
> 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
> 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
> 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
> /tmp/spark-local-20150327135047-5399
> 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5
> GB
> 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
> 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
> 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
> server' on port 57955.
> 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
> http://ip-10-241-251-232.us-west-2.compute.internal:4040
> 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
> spark://ip-10-241-251-232:7077...
> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
> cluster with app ID app-20150327135048-0002
> 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358
> 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager
> 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager
> ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
> BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
> 58358)
> 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
> 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
> ready for scheduling beginning after reached minRegisteredResourcesRatio:
> 0.0
> 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
> 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
> 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
> org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
> 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
> 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO MappedDStream: Storage level = StorageLevel(false,
> false, false, false, 1)
> 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
> 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO ShuffledDStream: Storage level =
> StorageLevel(false, false, false, false, 1)
> 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
> 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
> 15/03/27 13:50:48 INFO ForEachDStream: Storage level = StorageLevel(false,
> false, false, false, 1)
> 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
> 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
> 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@5df91314
> 15/03/27 13:50:48 INFO SparkContext: Starting job: start at
> WordCount.java:26
> 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator at
> time 1427478649000
> 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at 1427478649000
> ms
> 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
> 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
> WordCount.java:26)
> 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at
> WordCount.java:26) with 20 output partitions (allowLocal=false)
> 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
> WordCount.java:26)
> 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage: List(Stage 0)
> 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
> 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at
> start at WordCount.java:26), which has no missing parents
> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
> curMem=0, maxMem=3771948072
> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 2.7 KB, free 3.5 GB)
> 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
> curMem=2720, maxMem=3771948072
> 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes in memory (estimated size 1943.0 B, free 3.5 GB)
> 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0
> B, free: 3.5 GB)
> 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
> broadcast_0_piece0
> 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from broadcast at
> DAGScheduler.scala:838
> 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from
> Stage 0 (MappedRDD[2] at start at WordCount.java:26)
> 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50 tasks
> 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000 ms
> 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
> 1427478649000 ms.0 from job set of time 1427478649000 ms
> 15/03/27 13:50:49 INFO SparkContext: Starting job: print at
> WordCount.java:53
> 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
> WordCount.java:39)
> 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at
> WordCount.java:53) with 1 output partitions (allowLocal=true)
> 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
> WordCount.java:53)
> 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage: List(Stage 2)
> 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
> 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[7] at
> reduceByKey at WordCount.java:46), which has no missing parents
> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
> curMem=4663, maxMem=3771948072
> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 2.2 KB, free 3.5 GB)
> 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
> curMem=6927, maxMem=3771948072
> 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as
> bytes in memory (estimated size 1688.0 B, free 3.5 GB)
> 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0
> B, free: 3.5 GB)
> 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
> broadcast_1_piece0
> 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
> 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from Stage
> 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
> 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
> 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000 ms
> 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000 ms
> 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000 ms
> 15/03/27 13:50:53 IN
>
>
>
> On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Did you run the word count example in Spark local mode or other mode, in
>> local mode you have to set Local[n], where n >=2. For other mode, make sure
>> available cores larger than 1. Because the receiver inside Spark Streaming
>> wraps as a long-running task, which will at least occupy one core.
>>
>> Besides using lsof -p <pid> or netstat to make sure Spark executor
>> backend is connected to the nc process. Also grep the executor's log to see
>> if there's log like "Connecting to <host> <port>" and "Connected to <host>
>> <port>" which shows that receiver is correctly connected to nc process.
>>
>> Thanks
>> Jerry
>>
>> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>
>>> What's the best way to troubleshoot inside spark to see why Spark is not
>>> connecting to nc on port 9999? I don't see any errors either.
>>>
>>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mo...@gmail.com>
>>> wrote:
>>>
>>>> I am trying to run the word count example but for some reason it's not
>>>> working as expected. I start "nc" server on port 9999 and then submit the
>>>> spark job to the cluster. Spark job gets successfully submitting but I
>>>> never see any connection from spark getting established. I also tried to
>>>> type words on the console where "nc" is listening and waiting on the
>>>> prompt, however I don't see any output. I also don't see any errors.
>>>>
>>>> Here is the conf:
>>>>
>>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>>> "NetworkWordCount");
>>>>
>>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>>> Durations.*seconds*(1));
>>>>
>>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>>> "localhost", 9999);
>>>>
>>>
>>>
>>
>

Re: WordCount example

Posted by Mohit Anchlia <mo...@gmail.com>.
I checked the ports using netstat and don't see any connections established
on that port. Logs show only this:

15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
app-20150327135048-0002

Spark ui shows:

Running Applications
IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
app-20150327135048-0002
<http://54.69.225.94:8080/app?appId=app-20150327135048-0002>NetworkWordCount
<http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27
13:50:48ec2-userWAITING33 s
Code looks like is being executed:

java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077

*public* *static* *void* doWork(String masterUrl){

SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
"NetworkWordCount");

JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
*seconds*(1));

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost",
9999);

System.*out*.println("Successfully created connection");

*mapAndReduce*(lines);

 jssc.start(); // Start the computation

jssc.awaitTermination(); // Wait for the computation to terminate

}

*public* *static* *void* main(String ...args){

*doWork*(args[0]);

}
And output of the java program after submitting the task:

java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user
15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(ec2-user);
users with modify permissions: Set(ec2-user)
15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
15/03/27 13:50:46 INFO Remoting: Starting remoting
15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@ip-10-241-251-232.us-west-2.compute.internal:60184]
15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on
port 60184.
15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20150327135047-5399
15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB
15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
server' on port 57955.
15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
http://ip-10-241-251-232.us-west-2.compute.internal:4040
15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
spark://ip-10-241-251-232:7077...
15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
cluster with app ID app-20150327135048-0002
15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358
15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager
15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager
ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal,
58358)
15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
ready for scheduling beginning after reached minRegisteredResourcesRatio:
0.0
15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1
15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1
15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1
15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1
15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1
15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms
15/03/27 13:50:48 INFO SocketInputDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null
15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms
15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated
org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d
15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms
15/03/27 13:50:48 INFO FlatMappedDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null
15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms
15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5
15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms
15/03/27 13:50:48 INFO MappedDStream: Storage level = StorageLevel(false,
false, false, false, 1)
15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null
15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms
15/03/27 13:50:48 INFO MappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.MappedDStream@5ae2740f
15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms
15/03/27 13:50:48 INFO ShuffledDStream: Storage level = StorageLevel(false,
false, false, false, 1)
15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null
15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms
15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated
org.apache.spark.streaming.dstream.ShuffledDStream@4931b366
15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms
15/03/27 13:50:48 INFO ForEachDStream: Storage level = StorageLevel(false,
false, false, false, 1)
15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null
15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms
15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated
org.apache.spark.streaming.dstream.ForEachDStream@5df91314
15/03/27 13:50:48 INFO SparkContext: Starting job: start at
WordCount.java:26
15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator at
time 1427478649000
15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at 1427478649000
ms
15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler
15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at
WordCount.java:26)
15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at WordCount.java:26)
with 20 output partitions (allowLocal=false)
15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at
WordCount.java:26)
15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage: List(Stage 0)
15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0)
15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at
start at WordCount.java:26), which has no missing parents
15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with
curMem=0, maxMem=3771948072
15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 2.7 KB, free 3.5 GB)
15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with
curMem=2720, maxMem=3771948072
15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 1943.0 B, free 3.5 GB)
15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0 B,
free: 3.5 GB)
15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from broadcast at
DAGScheduler.scala:838
15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from Stage
0 (MappedRDD[2] at start at WordCount.java:26)
15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50 tasks
15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000 ms
15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job
1427478649000 ms.0 from job set of time 1427478649000 ms
15/03/27 13:50:49 INFO SparkContext: Starting job: print at
WordCount.java:53
15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at
WordCount.java:39)
15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at WordCount.java:53)
with 1 output partitions (allowLocal=true)
15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at
WordCount.java:53)
15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage: List(Stage 2)
15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List()
15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[7] at
reduceByKey at WordCount.java:46), which has no missing parents
15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with
curMem=4663, maxMem=3771948072
15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 2.2 KB, free 3.5 GB)
15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with
curMem=6927, maxMem=3771948072
15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 1688.0 B, free 3.5 GB)
15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0 B,
free: 3.5 GB)
15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:838
15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from Stage
3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46)
15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000 ms
15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000 ms
15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000 ms
15/03/27 13:50:53 IN



On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sa...@gmail.com> wrote:

> Hi,
>
> Did you run the word count example in Spark local mode or other mode, in
> local mode you have to set Local[n], where n >=2. For other mode, make sure
> available cores larger than 1. Because the receiver inside Spark Streaming
> wraps as a long-running task, which will at least occupy one core.
>
> Besides using lsof -p <pid> or netstat to make sure Spark executor backend
> is connected to the nc process. Also grep the executor's log to see if
> there's log like "Connecting to <host> <port>" and "Connected to <host>
> <port>" which shows that receiver is correctly connected to nc process.
>
> Thanks
> Jerry
>
> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>
>> What's the best way to troubleshoot inside spark to see why Spark is not
>> connecting to nc on port 9999? I don't see any errors either.
>>
>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mo...@gmail.com>
>> wrote:
>>
>>> I am trying to run the word count example but for some reason it's not
>>> working as expected. I start "nc" server on port 9999 and then submit the
>>> spark job to the cluster. Spark job gets successfully submitting but I
>>> never see any connection from spark getting established. I also tried to
>>> type words on the console where "nc" is listening and waiting on the
>>> prompt, however I don't see any output. I also don't see any errors.
>>>
>>> Here is the conf:
>>>
>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>>> "NetworkWordCount");
>>>
>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
>>> Durations.*seconds*(1));
>>>
>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>>> "localhost", 9999);
>>>
>>
>>
>

Re: WordCount example

Posted by Saisai Shao <sa...@gmail.com>.
Hi,

Did you run the word count example in Spark local mode or other mode, in
local mode you have to set Local[n], where n >=2. For other mode, make sure
available cores larger than 1. Because the receiver inside Spark Streaming
wraps as a long-running task, which will at least occupy one core.

Besides using lsof -p <pid> or netstat to make sure Spark executor backend
is connected to the nc process. Also grep the executor's log to see if
there's log like "Connecting to <host> <port>" and "Connected to <host>
<port>" which shows that receiver is correctly connected to nc process.

Thanks
Jerry

2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:

> What's the best way to troubleshoot inside spark to see why Spark is not
> connecting to nc on port 9999? I don't see any errors either.
>
> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mo...@gmail.com>
> wrote:
>
>> I am trying to run the word count example but for some reason it's not
>> working as expected. I start "nc" server on port 9999 and then submit the
>> spark job to the cluster. Spark job gets successfully submitting but I
>> never see any connection from spark getting established. I also tried to
>> type words on the console where "nc" is listening and waiting on the
>> prompt, however I don't see any output. I also don't see any errors.
>>
>> Here is the conf:
>>
>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
>> "NetworkWordCount");
>>
>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
>> *seconds*(1));
>>
>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream(
>> "localhost", 9999);
>>
>
>

Re: WordCount example

Posted by Mohit Anchlia <mo...@gmail.com>.
What's the best way to troubleshoot inside spark to see why Spark is not
connecting to nc on port 9999? I don't see any errors either.

On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mo...@gmail.com>
wrote:

> I am trying to run the word count example but for some reason it's not
> working as expected. I start "nc" server on port 9999 and then submit the
> spark job to the cluster. Spark job gets successfully submitting but I
> never see any connection from spark getting established. I also tried to
> type words on the console where "nc" is listening and waiting on the
> prompt, however I don't see any output. I also don't see any errors.
>
> Here is the conf:
>
> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
> "NetworkWordCount");
>
> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.
> *seconds*(1));
>
> JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost",
> 9999);
>