You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by anoldbrain <an...@gmail.com> on 2014/02/17 08:31:18 UTC

How to use FlumeInputDStream in spark cluster?

Hello all.

I am trying to get events from Flume and process these events in spark
cluster using spark streaming. However, according to  Spark Streaming
Programming Guide
<https://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html> 
,


If I understand correctly, Flume sink listens to a certain address:port. If
the receiver moves to another worker, that address changes, and the flume
flow is broken. For example, say I setup a flume source to send to
192.168.1.11:4141, then the FlumeReceiver has to stay on 192.168.1.11 for
this to work.

I have tried the example FlumeEventCount. It works on local mode, but fails
to run in cluster mode.

How to make FlumeInputDStream work in cluster mode?

Thank you.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Ravi Hemnani <ra...@gmail.com>.
On 03/21/2014 06:17 PM, anoldbrain [via Apache Spark User List] wrote:
> he actual <address>, which in turn causes the 'Fail to bind to ...' 
> error. This comes naturally because the slave that is running the code 
> to bind to <address>:<port> has a different ip. 
So if we run the code on the slave where we are sending the data using 
flume agent, it should work. Let me give a shot to this and check what 
is happening.

Thanks you for the immediate reply. Ill keep you posted.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Ravi Hemnani <ra...@gmail.com>.
I'll start with Kafka implementation.

Thanks for all the help.
On Mar 21, 2014 7:00 PM, "anoldbrain [via Apache Spark User List]" <
ml-node+s1001560n2994h47@n3.nabble.com> wrote:

> It is my understanding that there is no way to make FlumeInputDStream work
> in a cluster environment with the current release. Switch to Kafka, if you
> can, would be my suggestion, although I have not used KafkaInputDStream.
> There is a big difference between Kafka and Flume InputDstream:
> KafkaInputDStreams are consumers (clients). FlumeInputDStream, which needs
> to listen on a specific address:port so other flume agent can send messages
> to. This may also give Kafka an advantage on performance too.
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2994.html
>  To unsubscribe from How to use FlumeInputDStream in spark cluster?, click
> here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1604&code=cmF2aWlpaGVtbmFuaUBnbWFpbC5jb218MTYwNHwtMTk4NTIxNTA1Nw==>
> .
> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
It is my understanding that there is no way to make FlumeInputDStream work in
a cluster environment with the current release. Switch to Kafka, if you can,
would be my suggestion, although I have not used KafkaInputDStream. There is
a big difference between Kafka and Flume InputDstream: KafkaInputDStreams
are consumers (clients). FlumeInputDStream, which needs to listen on a
specific address:port so other flume agent can send messages to. This may
also give Kafka an advantage on performance too.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Ping Tang <pt...@aerohive.com>.
Thank you very much for your reply.

I have a cluster of 8 nodes: m1, m2, m3.. m8. m1 configured as Spark master node, the rest of the nodes are all worker node. I also configured m3 as the History Server. But the history server fails to start.I ran FlumeEventCount in m1 using the right hostname and a port that is not used by any application. Here is the script I used to run FlumeEventCount:


#!/bin/bash


spark-submit --verbose --class org.apache.spark.examples.streaming.FlumeEventCount --deploy-mode client --master yarn-client --jars lib/spark-streaming-flume_2.10-1.1.0-cdh5.2.2-20141112.193826-1.jar /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/examples/lib/spark-examples-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar m1.ptang.aerohive.com 1234


Same issue observed after added  "spark.ui.port=4321" in  /etc/spark/conf/spark-defaults.conf. Followings are the exceptions from the job run:


14/12/01 11:51:45 INFO JobScheduler: Finished job streaming job 1417463504000 ms.0 from job set of time 1417463504000 ms

14/12/01 11:51:45 INFO JobScheduler: Total delay: 1.465 s for time 1417463504000 ms (execution: 1.415 s)

14/12/01 11:51:45 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: m1.ptang.aerohive.com/192.168.10.22:1234

at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)

at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)

at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)

at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.BindException: Cannot assign requested address

at sun.nio.ch.Net.bind0(Native Method)

at sun.nio.ch.Net.bind(Net.java:444)

at sun.nio.ch.Net.bind(Net.java:436)

at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)

at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)

at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)

at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)

... 3 more


14/12/01 11:51:45 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 72, m8.ptang.aerohive.com): org.jboss.netty.channel.ChannelException: Failed to bind to: m1.ptang.aerohive.com/192.168.10.22:1234

        org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)

        org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)

        org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)

        org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

        org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

        org.apache.spark.scheduler.Task.run(Task.scala:54)

        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        java.lang.Thread.run(Thread.java:745)

Thanks

Ping

From: Prannoy <pr...@sigmoidanalytics.com>>
Date: Friday, November 28, 2014 at 12:56 AM
To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>
Subject: Re: How to use FlumeInputDStream in spark cluster?

Hi,

BindException comes when two processes are using the same port. In your spark configuration just set ("spark.ui.port","xxxxx"),
to some other port. xxxxx can be any number say 12345. BindException will not break your job in either case. Just to fix it change the port number.

Thanks.

On Fri, Nov 28, 2014 at 1:30 PM, pamtang [via Apache Spark User List] <[hidden email]</user/SendEmail.jtp?type=node&node=19999&i=0>> wrote:
I'm seeing the same issue on CDH 5.2 with Spark 1.1. FlumeEventCount works fine on a Standalone cluster but throw BindException on YARN mode. Is there a solution to this problem or FlumeInputDStream will not be working in a cluster environment?

________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19997.html
To start a new topic under Apache Spark User List, email [hidden email]</user/SendEmail.jtp?type=node&node=19999&i=1>
To unsubscribe from Apache Spark User List, click here.
NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>


________________________________
View this message in context: Re: How to use FlumeInputDStream in spark cluster?<http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19999.html>
Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Prannoy <pr...@sigmoidanalytics.com>.
Hi,

BindException comes when two processes are using the same port. In your
spark configuration just set ("spark.ui.port","xxxxx"),
to some other port. xxxxx can be any number say 12345. BindException will
not break your job in either case. Just to fix it change the port number.

Thanks.

On Fri, Nov 28, 2014 at 1:30 PM, pamtang [via Apache Spark User List] <
ml-node+s1001560n19997h9@n3.nabble.com> wrote:

> I'm seeing the same issue on CDH 5.2 with Spark 1.1. FlumeEventCount works
> fine on a Standalone cluster but throw BindException on YARN mode. Is there
> a solution to this problem or FlumeInputDStream will not be working in a
> cluster environment?
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19997.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1h33@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19999.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by BigDataUser <su...@yahoo.com>.
I am running FlumeEventCount program in CDH 5.0.1 which has Spark 0.9.0. The
program runs fine in local process as well as standalone cluster mode.
However, the program fails in YARN mode. I see the following error:
INFO scheduler.DAGScheduler: Stage 2 (runJob at
NetworkInputTracker.scala:182) finished in 0.215 s
INFO spark.SparkContext: Job finished: runJob at
NetworkInputTracker.scala:182, took 0.224696381 s
ERROR scheduler.NetworkInputTracker: De-registered receiver for network
stream 0 with message org.jboss.netty.channel.ChannelException: Failed to
bind to: xxxxx/xx.xxx.x.xx:41415
Is there a workaround for this ?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p17226.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to use FlumeInputDStream in spark cluster?

Posted by julyfire <he...@gmail.com>.
I have test the example codes FlumeEventCount on standalone cluster, and this
is still a problem in Spark 1.1.0, the latest version up to now. Do you have
solved this issue in your way?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p15102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to use FlumeInputDStream in spark cluster?

Posted by Ravi Hemnani <ra...@gmail.com>.
On 03/21/2014 06:17 PM, anoldbrain [via Apache Spark User List] wrote:
> he actual <address>, which in turn causes the 'Fail to bind to ...' 
> error. This comes naturally because the slave that is running the code 
> to bind to <address>:<port> has a different ip. 
I ran sudo ./run-example 
org.apache.spark.streaming.examples.FlumeEventCount 
spark://<spark_master_hostname>:7077 <worker_hostname> 7781 on 
<worker_hostname> and still it shows

14/03/21 13:12:12 ERROR scheduler.NetworkInputTracker: De-registered 
receiver for network stream 0 with message 
org.jboss.netty.channel.ChannelException: Failed to bind 
to:<worker_hostname> /<worker_ipaddress>:7781
14/03/21 13:12:12 INFO spark.SparkContext: Job finished: runJob at 
NetworkInputTracker.scala:182, took 0.530447982 s
14/03/21 13:12:14 INFO scheduler.NetworkInputTracker: Stream 0 received 
0 blocks

Weird issue. I need to setup spark streaming and make it run. I am 
thinking to switch to kafka. I havent checked it yet but i dont see a 
work around for this. Any help would be good. I am making changes in the 
flume.conf and checking different settings.

Thank you.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2993.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
Hi,

This is my summary of the gap between expected behavior and actual behavior.

FlumeEventCount spark://<spark_master_hostname>:7077 <address> <port>

Expected: an 'agent' listening on <address>:<port> (bind to). In the context
of Spark, this agent should be running on one of the slaves, which should be
the slave whose ip/hostname is <address>.

Observed: A random slave is chosen in the pool of available slaves.
Therefore, in a cluster environment, is likely not the slave having the
actual <address>, which in turn causes the 'Fail to bind to ...' error. This
comes naturally because the slave that is running the code to bind to
<address>:<port> has a different ip.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Ravi Hemnani <ra...@gmail.com>.
Hey,


Even i am getting the same error. 

I am running, 

sudo ./run-example org.apache.spark.streaming.examples.FlumeEventCount
spark://<spark_master_hostname>:7077 <spark_master_hostname> 7781

and getting no events in the spark streaming. 

-------------------------------------------
Time: 1395395676000 ms
-------------------------------------------
Received 0 flume events.

14/03/21 09:54:36 INFO JobScheduler: Finished job streaming job
1395395676000 ms.0 from job set of time 1395395676000 ms
14/03/21 09:54:36 INFO JobScheduler: Total delay: 0.196 s for time
1395395676000 ms (execution: 0.111 s)
14/03/21 09:54:38 INFO NetworkInputTracker: Stream 0 received 0 blocks
14/03/21 09:54:38 INFO SparkContext: Starting job: take at DStream.scala:586
14/03/21 09:54:38 INFO JobScheduler: Starting job streaming job
1395395678000 ms.0 from job set of time 1395395678000 ms
14/03/21 09:54:38 INFO DAGScheduler: Registering RDD 73 (combineByKey at
ShuffledDStream.scala:42)
14/03/21 09:54:38 INFO DAGScheduler: Got job 16 (take at DStream.scala:586)
with 1 output partitions (allowLocal=true)
14/03/21 09:54:38 INFO DAGScheduler: Final stage: Stage 31 (take at
DStream.scala:586)
14/03/21 09:54:38 INFO DAGScheduler: Parents of final stage: List(Stage 32)
14/03/21 09:54:38 INFO JobScheduler: Added jobs for time 1395395678000 ms
14/03/21 09:54:38 INFO DAGScheduler: Missing parents: List(Stage 32)
14/03/21 09:54:38 INFO DAGScheduler: Submitting Stage 32
(MapPartitionsRDD[73] at combineByKey at ShuffledDStream.scala:42), which
has no missing parents
14/03/21 09:54:38 INFO DAGScheduler: Submitting 1 missing tasks from Stage
32 (MapPartitionsRDD[73] at combineByKey at ShuffledDStream.scala:42)
14/03/21 09:54:38 INFO TaskSchedulerImpl: Adding task set 32.0 with 1 tasks
14/03/21 09:54:38 INFO TaskSetManager: Starting task 32.0:0 as TID 92 on
executor 2: c8-data-store-4.srv.media.net (PROCESS_LOCAL)
14/03/21 09:54:38 INFO TaskSetManager: Serialized task 32.0:0 as 2971 bytes
in 1 ms
14/03/21 09:54:38 INFO TaskSetManager: Finished TID 92 in 41 ms on
c8-data-store-4.srv.media.net (progress: 0/1)
14/03/21 09:54:38 INFO TaskSchedulerImpl: Remove TaskSet 32.0 from pool 



Also on closer look, i got 

INFO SparkContext: Job finished: runJob at NetworkInputTracker.scala:182,
took 0.523621327 s
14/03/21 09:54:35 ERROR NetworkInputTracker: De-registered receiver for
network stream 0 with message org.jboss.netty.channel.ChannelException:
Failed to bind to: c8-data-store-1.srv.media.net/172.16.200.124:7781


I couldnt understand the NetworkInputTracker that you told about. Can you
elaborate that? 

I only understood that the master checks any one of the workers nodes for
the connection and stays on it till the program runs. Why is it not checking
on the <host> and <port> i am providing. Also, <host> and <port> should
necessarily any worker node? 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2987.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
Tracked more codes and here's my findings.

1. TaskLocation(Some("node-005"), None) is not incorrect.
2. The problem is caused by a weird 'malfunction' of .contains call by all
the HashMaps for storing tasks in  TaskSetManager.scala
<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala>  
and  TaskSchedulerImpl.scala
<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala> 
, i.e. pendingTasksForExecutor, pendingTasksForHost, and executorByHost in
my particular case, always returning false, even though the intended host is
in the HashMap. Posting some logs I added below to explain the situation:

> 4/02/24 17:36:03 INFO spark-akka.actor.default-dispatcher-4
> TaskSetManager: addPendingTask: ArrayBuffer(TaskLocation(Some(node-004),
> None))
> 14/02/24 17:36:03 INFO spark-akka.actor.default-dispatcher-4
> TaskSetManager: addPendingTask: TaskLocation(Some(node-004), None)
> 14/02/24 17:36:03 DEBUG spark-akka.actor.default-dispatcher-4
> TaskSchedulerImpl: getExecutorsAliveOnHost: Map(node-003 -> Set(4),
> node-009 -> Set(6), node-002 -> Set(2), node-005 -> Set(0), node-008 ->
> Set(7), node-007 -> Set(1), node-004 -> Set(3), node-010 -> Set(5))
> 14/02/24 17:36:03 INFO spark-akka.actor.default-dispatcher-4
> TaskSetManager: getExecutorsAliveOnHost: Some(node-004) None
> 14/02/24 17:36:03 DEBUG spark-akka.actor.default-dispatcher-4
> TaskSchedulerImpl: hasExecutorsAliveOnHost: Some(node-004) false


> 4/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: resourceOffer: 0 node-004 8 NODE_LOCAL
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: findTask: 0 node-004 NODE_LOCAL
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: TaskLocality NODE_LOCAL: true
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: TaskLocality RACK_LOCAL: false
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: TaskLocality ANY: false
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: getPendingTasksForHost: node-005 Map(Some(node-004) ->
> ArrayBuffer(0)) List(Some(node-004))
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: contains? false
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: forhost: ArrayBuffer()
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: getPendingTasksForHost: node-005 Map(Some(node-004) ->
> ArrayBuffer(0)) List(Some(node-004))
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: contains? false
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: noprefs: ArrayBuffer()
> 14/02/24 17:36:06 DEBUG spark-akka.actor.default-dispatcher-18
> TaskSetManager: speculative: None

Combining with the order of locality check and the
'randomness/predictability' of the checking order of the executors, then it
is always the first worker checked got assigned the networkinputdstream job,
which most likely not the intended target node. From my observation, some
day it is always node-002, some day it is always node-007.

Can anyone help explaining why the '.contains' call does not work as I
expected them to be? What is it with the 'Some("node-005")'? The methods
involved all have static type checked, String (I also checked them at
runtime). Why would the key check fails?

Please let me know if I didn't explain my testing clear enough.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1971.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Tathagata Das <ta...@gmail.com>.
Can you give me the whole log with Debug level logs. Also please tell me
what are the configurations (system properties, etc.) that are being used.
Maybe the screenshot of the "environment" tab in Spark application web ui
is fine.

TD


On Thu, Feb 20, 2014 at 10:26 PM, anoldbrain <an...@gmail.com> wrote:

> Modifications highlighted in *bold*. I also added 2 log lines in
> TaskSetManager:addPendingTask.
>
> > ...
> > 14/02/21 14:08:42 INFO main NetworkInputTracker: NetworkInputTracker
> > started
> *
> > 14/02/21 14:08:42 INFO Thread-33 NetworkInputTracker: tempRDD
> > hasLocationPreferences? true
> > 14/02/21 14:08:42 INFO Thread-33 NetworkInputTracker: tempRDD partition 0
> > preferredLocations: List(Some(node-005))
> *
> > 14/02/21 14:08:42 INFO Thread-33 SparkContext: Starting job: collect at
> > NetworkInputTracker.scala:189
> > ...
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > DAGScheduler: Final stage: Stage 22 (runJob at
> > NetworkInputTracker.scala:193)
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > DAGScheduler: Parents of final stage: List()
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > DAGScheduler: Missing parents: List()
> > 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> > DAGScheduler: submitStage(Stage 22)
> > 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> > DAGScheduler: missing: List()
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > DAGScheduler: Submitting Stage 22 (ParallelCollectionRDD[0] at makeRDD at
> > NetworkInputTracker.scala:169), which has no missing parents
> > 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> > DAGScheduler: submitMissingTasks(Stage 22)
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > DAGScheduler: Submitting 1 missing tasks from Stage 22
> > (ParallelCollectionRDD[0] at makeRDD at NetworkInputTracker.scala:169)
> > 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> > DAGScheduler: New pending tasks: Set(ResultTask(22, 0))
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > TaskSchedulerImpl: Adding task set 22.0 with 1 tasks
> > 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> > TaskSetManager: Epoch for TaskSet 22.0: 6
> *
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > TaskSetManager: addPendingTask 0 ResultTask(22, 0)
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> > TaskSetManager: addPendingTask 0 TaskLocation(Some(node-005), None)
> *
> > 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> > TaskSetManager: Valid locality levels for TaskSet 22.0: ANY
> > 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-13
> > TaskSchedulerImpl: parentName: , name: TaskSet_22, runningTasks: 0
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-13
> > TaskSetManager: Starting task 22.0:0 as TID 85 on executor 2: node-002
> > (PROCESS_LOCAL)
> > 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-13
> > TaskSetManager: Serialized task 22.0:0 as 2023 bytes in 0 ms
> > ...
>
> I have not tracked down to why there's no executorId in TaskLocation yet.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1873.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
Modifications highlighted in *bold*. I also added 2 log lines in
TaskSetManager:addPendingTask.

> ...
> 14/02/21 14:08:42 INFO main NetworkInputTracker: NetworkInputTracker
> started
*
> 14/02/21 14:08:42 INFO Thread-33 NetworkInputTracker: tempRDD
> hasLocationPreferences? true
> 14/02/21 14:08:42 INFO Thread-33 NetworkInputTracker: tempRDD partition 0
> preferredLocations: List(Some(node-005))
*
> 14/02/21 14:08:42 INFO Thread-33 SparkContext: Starting job: collect at
> NetworkInputTracker.scala:189
> ...
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> DAGScheduler: Final stage: Stage 22 (runJob at
> NetworkInputTracker.scala:193)
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> DAGScheduler: Parents of final stage: List()
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> DAGScheduler: Missing parents: List()
> 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> DAGScheduler: submitStage(Stage 22)
> 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> DAGScheduler: missing: List()
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> DAGScheduler: Submitting Stage 22 (ParallelCollectionRDD[0] at makeRDD at
> NetworkInputTracker.scala:169), which has no missing parents
> 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> DAGScheduler: submitMissingTasks(Stage 22)
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> DAGScheduler: Submitting 1 missing tasks from Stage 22
> (ParallelCollectionRDD[0] at makeRDD at NetworkInputTracker.scala:169)
> 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> DAGScheduler: New pending tasks: Set(ResultTask(22, 0))
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> TaskSchedulerImpl: Adding task set 22.0 with 1 tasks
> 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> TaskSetManager: Epoch for TaskSet 22.0: 6
*
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> TaskSetManager: addPendingTask 0 ResultTask(22, 0)
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-20
> TaskSetManager: addPendingTask 0 TaskLocation(Some(node-005), None)
*
> 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-20
> TaskSetManager: Valid locality levels for TaskSet 22.0: ANY
> 14/02/21 14:08:53 DEBUG spark-akka.actor.default-dispatcher-13
> TaskSchedulerImpl: parentName: , name: TaskSet_22, runningTasks: 0
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-13
> TaskSetManager: Starting task 22.0:0 as TID 85 on executor 2: node-002
> (PROCESS_LOCAL)
> 14/02/21 14:08:53 INFO spark-akka.actor.default-dispatcher-13
> TaskSetManager: Serialized task 22.0:0 as 2023 bytes in 0 ms
> ...

I have not tracked down to why there's no executorId in TaskLocation yet.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1873.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Tathagata Das <ta...@gmail.com>.
Okay, lets try to debug this. If you look at the
NetworkInputTracker<https://github.com/apache/incubator-spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala?source=c>class,
the RDD generated on line 165 is the RDD. The runJob in line 190
actually starts a job that sends the receiver to the workers and starts
them.

Can you modify the code to see if hasLocationPreference is true and the
contents of the tempRDD.preferredLocation? That will narrow down the bug.

TD


On Thu, Feb 20, 2014 at 4:17 PM, anoldbrain <an...@gmail.com> wrote:

> It was not running in node-005, it was on node-001. Nothing occupies 4141
> on
> all nodes.
>
> Entries on Application web-UI generated by NetworkInputTracker
>
>
> > ....
> > 22    runJob at NetworkInputTracker.scala:190 2014/02/21 08:10:13
>  183 ms
> > ....
> > 0     collect at NetworkInputTracker.scala:186        2014/02/21
> 08:10:11      1.5 s
> > ...
> > 1     reduceByKey at NetworkInputTracker.scala:186    2014/02/21
> 08:10:02      9.6 s
> > 50/50         7.6 KB
>
> All links to node-001:4040
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1853.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
It was not running in node-005, it was on node-001. Nothing occupies 4141 on
all nodes.

Entries on Application web-UI generated by NetworkInputTracker


> ....
> 22	runJob at NetworkInputTracker.scala:190	2014/02/21 08:10:13	 183 ms	
> ....
> 0	collect at NetworkInputTracker.scala:186	2014/02/21 08:10:11	 1.5 s		
> ...
> 1	reduceByKey at NetworkInputTracker.scala:186	2014/02/21 08:10:02	 9.6 s	
> 50/50		7.6 KB

All links to node-001:4040



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Tathagata Das <ta...@gmail.com>.
This is a little confusing. Lets try to confirm the following first. In the
Spark application's web ui, can you find the the stage (one of the first
few) that has only 1 task and has the name XYZ at NetworkInputTracker . In
that can you see where the single task is running? Is it in node-005, or
any other node? That is the task that is supposed to start the receiver,
and based on the location preference it should run on node-005. If it is
not running on node-005 then the bind failure makes sense, and we need to
see why it is not using the location preference. If it is running on
node-005 then I am not sure why binding is failing. Anything else thats
bound to 4141?



On Tue, Feb 18, 2014 at 11:15 PM, anoldbrain <an...@gmail.com> wrote:

> Both standalone mode and mesos were tested, with the same outcome. After
> your
> suggestion, I tried again in standalone mode and specified the <host> with
> what was written in the log of a worker node. The problem remains.
>
> A bit more detail, the bind failed error is reported on the driver node.
>
> Say, driver node is node-001, and worker nodes are node-002 ~ node-008. Run
> the command on node-001
>
> > $ bin/run-example org.apache.spark.streaming.examples.FlumeEventCount
> > spark://node-001:7077 node-005 4141
>
> and on node-001, I see
>
> > ...
> > 14/02/19 15:01:04 INFO main HttpBroadcast: Broadcast server started at
> > http://192.168.101.11:60094
> > 14/02/19 15:01:04 DEBUG main MetadataCleaner: Starting metadata cleaner
> > for HTTP_BROADCAST with delay of 3600 seconds and period of 360 secs
> > 14/02/19 15:01:04 DEBUG main MetadataCleaner: Starting metadata cleaner
> > for MAP_OUTPUT_TRACKER with delay of 3600 seconds and period of 360 secs
> > ...
> > 14/02/19 15:01:06 INFO spark-akka.actor.default-dispatcher-13
> > SparkDeploySchedulerBackend: Registered executor:
> > Actor[akka.tcp://sparkExecutor@node-005:54959/user/Executor#-820307304]
> > with ID 0
> > 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-5
> > DAGScheduler: submitStage(Stage 0)
> > 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-13
> > TaskSchedulerImpl: parentName: , name: TaskSet_1, runningTasks: 24
> > 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-5
> > DAGScheduler: missing: List(Stage 1)
> > 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-5
> > DAGScheduler: submitStage(Stage 1)
> > 14/02/19 15:01:06 INFO spark-akka.actor.default-dispatcher-13
> > TaskSetManager: Starting task 1.0:24 as TID 24 on executor 0: node-005
> > (PROCESS_LOCAL)
> > ...
> > 14/02/19 15:01:16 DEBUG spark-akka.actor.default-dispatcher-16
> > TaskSchedulerImpl: parentName: , name: TaskSet_22, runningTasks: 1
> > 14/02/19 15:01:16 DEBUG spark-akka.actor.default-dispatcher-15
> > TaskSchedulerImpl: parentName: , name: TaskSet_22, runningTasks: 0
> > 14/02/19 15:01:16 INFO spark-akka.actor.default-dispatcher-16
> > DAGScheduler: Completed ResultTask(22, 0)
> > 14/02/19 15:01:16 ERROR spark-akka.actor.default-dispatcher-22
> > NetworkInputTracker: De-registered receiver for network stream 0 with
> > message org.jboss.netty.channel.ChannelException: Failed to bind to:
> > node-005/192.168.101.15:4141
> > ...
>
> And there are no 'ERROR' level logs on node-005 stdout/stderr. Any specific
> type of log entries I should look at?
>
> Note: When I added a logInfo line in the getLocationPreference() of
> FlumeReceiver and run the example with the new jar. This line is displayed
> on driver node (node-001).
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1742.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
Both standalone mode and mesos were tested, with the same outcome. After your
suggestion, I tried again in standalone mode and specified the <host> with
what was written in the log of a worker node. The problem remains.

A bit more detail, the bind failed error is reported on the driver node.

Say, driver node is node-001, and worker nodes are node-002 ~ node-008. Run
the command on node-001

> $ bin/run-example org.apache.spark.streaming.examples.FlumeEventCount
> spark://node-001:7077 node-005 4141

and on node-001, I see

> ...
> 14/02/19 15:01:04 INFO main HttpBroadcast: Broadcast server started at
> http://192.168.101.11:60094
> 14/02/19 15:01:04 DEBUG main MetadataCleaner: Starting metadata cleaner
> for HTTP_BROADCAST with delay of 3600 seconds and period of 360 secs
> 14/02/19 15:01:04 DEBUG main MetadataCleaner: Starting metadata cleaner
> for MAP_OUTPUT_TRACKER with delay of 3600 seconds and period of 360 secs
> ...
> 14/02/19 15:01:06 INFO spark-akka.actor.default-dispatcher-13
> SparkDeploySchedulerBackend: Registered executor:
> Actor[akka.tcp://sparkExecutor@node-005:54959/user/Executor#-820307304]
> with ID 0
> 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-5
> DAGScheduler: submitStage(Stage 0)
> 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-13
> TaskSchedulerImpl: parentName: , name: TaskSet_1, runningTasks: 24
> 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-5
> DAGScheduler: missing: List(Stage 1)
> 14/02/19 15:01:06 DEBUG spark-akka.actor.default-dispatcher-5
> DAGScheduler: submitStage(Stage 1)
> 14/02/19 15:01:06 INFO spark-akka.actor.default-dispatcher-13
> TaskSetManager: Starting task 1.0:24 as TID 24 on executor 0: node-005
> (PROCESS_LOCAL)
> ...
> 14/02/19 15:01:16 DEBUG spark-akka.actor.default-dispatcher-16
> TaskSchedulerImpl: parentName: , name: TaskSet_22, runningTasks: 1
> 14/02/19 15:01:16 DEBUG spark-akka.actor.default-dispatcher-15
> TaskSchedulerImpl: parentName: , name: TaskSet_22, runningTasks: 0
> 14/02/19 15:01:16 INFO spark-akka.actor.default-dispatcher-16
> DAGScheduler: Completed ResultTask(22, 0)
> 14/02/19 15:01:16 ERROR spark-akka.actor.default-dispatcher-22
> NetworkInputTracker: De-registered receiver for network stream 0 with
> message org.jboss.netty.channel.ChannelException: Failed to bind to:
> node-005/192.168.101.15:4141
> ...

And there are no 'ERROR' level logs on node-005 stdout/stderr. Any specific
type of log entries I should look at?

Note: When I added a logInfo line in the getLocationPreference() of
FlumeReceiver and run the example with the new jar. This line is displayed
on driver node (node-001).

Thank you.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1742.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Tathagata Das <ta...@gmail.com>.
It could be that the hostname that Spark uses to identify the node is
different from the one you are providing. Are you using the Spark
standalone mode? In that case, you can check out the hostnames that Spark
is seeing and use that name.

Let me know if that works out.

TD


On Mon, Feb 17, 2014 at 12:36 PM, anoldbrain <an...@gmail.com> wrote:

> FlumeInputDStream extends NetworkInputDStream, which runs on worker nodes.
> This brings up another question. 'getLocationPreference' doesn't seem to
> work like I expected it to be. It got called on driver noder, and yet the
> returned 'host' value doesn't get honored.
>
> The behavior I observed is:
>
> 1. FlumeEventCount /master/ /workerA-ip/ 4141: bind failed, because the
> actual worker node assigned the listen for FlumeInputDStream is not
> workerA.
> However, the designated worker node, say workerB, is always the same run
> after run.
> 2. FlumeEventCount /master/ localhost 4141: at each round, different worker
> node listens on 4141
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1643.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
FlumeInputDStream extends NetworkInputDStream, which runs on worker nodes.
This brings up another question. 'getLocationPreference' doesn't seem to
work like I expected it to be. It got called on driver noder, and yet the
returned 'host' value doesn't get honored.

The behavior I observed is:

1. FlumeEventCount /master/ /workerA-ip/ 4141: bind failed, because the
actual worker node assigned the listen for FlumeInputDStream is not workerA.
However, the designated worker node, say workerB, is always the same run
after run.
2. FlumeEventCount /master/ localhost 4141: at each round, different worker
node listens on 4141



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p1643.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.