You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ravi Hemnani <ra...@gmail.com> on 2014/03/21 13:31:14 UTC

Re: How to use FlumeInputDStream in spark cluster?

Hey,


Even i am getting the same error. 

I am running, 

sudo ./run-example org.apache.spark.streaming.examples.FlumeEventCount
spark://<spark_master_hostname>:7077 <spark_master_hostname> 7781

and getting no events in the spark streaming. 

-------------------------------------------
Time: 1395395676000 ms
-------------------------------------------
Received 0 flume events.

14/03/21 09:54:36 INFO JobScheduler: Finished job streaming job
1395395676000 ms.0 from job set of time 1395395676000 ms
14/03/21 09:54:36 INFO JobScheduler: Total delay: 0.196 s for time
1395395676000 ms (execution: 0.111 s)
14/03/21 09:54:38 INFO NetworkInputTracker: Stream 0 received 0 blocks
14/03/21 09:54:38 INFO SparkContext: Starting job: take at DStream.scala:586
14/03/21 09:54:38 INFO JobScheduler: Starting job streaming job
1395395678000 ms.0 from job set of time 1395395678000 ms
14/03/21 09:54:38 INFO DAGScheduler: Registering RDD 73 (combineByKey at
ShuffledDStream.scala:42)
14/03/21 09:54:38 INFO DAGScheduler: Got job 16 (take at DStream.scala:586)
with 1 output partitions (allowLocal=true)
14/03/21 09:54:38 INFO DAGScheduler: Final stage: Stage 31 (take at
DStream.scala:586)
14/03/21 09:54:38 INFO DAGScheduler: Parents of final stage: List(Stage 32)
14/03/21 09:54:38 INFO JobScheduler: Added jobs for time 1395395678000 ms
14/03/21 09:54:38 INFO DAGScheduler: Missing parents: List(Stage 32)
14/03/21 09:54:38 INFO DAGScheduler: Submitting Stage 32
(MapPartitionsRDD[73] at combineByKey at ShuffledDStream.scala:42), which
has no missing parents
14/03/21 09:54:38 INFO DAGScheduler: Submitting 1 missing tasks from Stage
32 (MapPartitionsRDD[73] at combineByKey at ShuffledDStream.scala:42)
14/03/21 09:54:38 INFO TaskSchedulerImpl: Adding task set 32.0 with 1 tasks
14/03/21 09:54:38 INFO TaskSetManager: Starting task 32.0:0 as TID 92 on
executor 2: c8-data-store-4.srv.media.net (PROCESS_LOCAL)
14/03/21 09:54:38 INFO TaskSetManager: Serialized task 32.0:0 as 2971 bytes
in 1 ms
14/03/21 09:54:38 INFO TaskSetManager: Finished TID 92 in 41 ms on
c8-data-store-4.srv.media.net (progress: 0/1)
14/03/21 09:54:38 INFO TaskSchedulerImpl: Remove TaskSet 32.0 from pool 



Also on closer look, i got 

INFO SparkContext: Job finished: runJob at NetworkInputTracker.scala:182,
took 0.523621327 s
14/03/21 09:54:35 ERROR NetworkInputTracker: De-registered receiver for
network stream 0 with message org.jboss.netty.channel.ChannelException:
Failed to bind to: c8-data-store-1.srv.media.net/172.16.200.124:7781


I couldnt understand the NetworkInputTracker that you told about. Can you
elaborate that? 

I only understood that the master checks any one of the workers nodes for
the connection and stays on it till the program runs. Why is it not checking
on the <host> and <port> i am providing. Also, <host> and <port> should
necessarily any worker node? 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2987.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Ravi Hemnani <ra...@gmail.com>.
On 03/21/2014 06:17 PM, anoldbrain [via Apache Spark User List] wrote:
> he actual <address>, which in turn causes the 'Fail to bind to ...' 
> error. This comes naturally because the slave that is running the code 
> to bind to <address>:<port> has a different ip. 
So if we run the code on the slave where we are sending the data using 
flume agent, it should work. Let me give a shot to this and check what 
is happening.

Thanks you for the immediate reply. Ill keep you posted.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Ravi Hemnani <ra...@gmail.com>.
I'll start with Kafka implementation.

Thanks for all the help.
On Mar 21, 2014 7:00 PM, "anoldbrain [via Apache Spark User List]" <
ml-node+s1001560n2994h47@n3.nabble.com> wrote:

> It is my understanding that there is no way to make FlumeInputDStream work
> in a cluster environment with the current release. Switch to Kafka, if you
> can, would be my suggestion, although I have not used KafkaInputDStream.
> There is a big difference between Kafka and Flume InputDstream:
> KafkaInputDStreams are consumers (clients). FlumeInputDStream, which needs
> to listen on a specific address:port so other flume agent can send messages
> to. This may also give Kafka an advantage on performance too.
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2994.html
>  To unsubscribe from How to use FlumeInputDStream in spark cluster?, click
> here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1604&code=cmF2aWlpaGVtbmFuaUBnbWFpbC5jb218MTYwNHwtMTk4NTIxNTA1Nw==>
> .
> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
It is my understanding that there is no way to make FlumeInputDStream work in
a cluster environment with the current release. Switch to Kafka, if you can,
would be my suggestion, although I have not used KafkaInputDStream. There is
a big difference between Kafka and Flume InputDstream: KafkaInputDStreams
are consumers (clients). FlumeInputDStream, which needs to listen on a
specific address:port so other flume agent can send messages to. This may
also give Kafka an advantage on performance too.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Ping Tang <pt...@aerohive.com>.
Thank you very much for your reply.

I have a cluster of 8 nodes: m1, m2, m3.. m8. m1 configured as Spark master node, the rest of the nodes are all worker node. I also configured m3 as the History Server. But the history server fails to start.I ran FlumeEventCount in m1 using the right hostname and a port that is not used by any application. Here is the script I used to run FlumeEventCount:


#!/bin/bash


spark-submit --verbose --class org.apache.spark.examples.streaming.FlumeEventCount --deploy-mode client --master yarn-client --jars lib/spark-streaming-flume_2.10-1.1.0-cdh5.2.2-20141112.193826-1.jar /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/examples/lib/spark-examples-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar m1.ptang.aerohive.com 1234


Same issue observed after added  "spark.ui.port=4321" in  /etc/spark/conf/spark-defaults.conf. Followings are the exceptions from the job run:


14/12/01 11:51:45 INFO JobScheduler: Finished job streaming job 1417463504000 ms.0 from job set of time 1417463504000 ms

14/12/01 11:51:45 INFO JobScheduler: Total delay: 1.465 s for time 1417463504000 ms (execution: 1.415 s)

14/12/01 11:51:45 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: m1.ptang.aerohive.com/192.168.10.22:1234

at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)

at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)

at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)

at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)

at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.BindException: Cannot assign requested address

at sun.nio.ch.Net.bind0(Native Method)

at sun.nio.ch.Net.bind(Net.java:444)

at sun.nio.ch.Net.bind(Net.java:436)

at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)

at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)

at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)

at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)

... 3 more


14/12/01 11:51:45 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 72, m8.ptang.aerohive.com): org.jboss.netty.channel.ChannelException: Failed to bind to: m1.ptang.aerohive.com/192.168.10.22:1234

        org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)

        org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)

        org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)

        org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)

        org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

        org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

        org.apache.spark.scheduler.Task.run(Task.scala:54)

        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        java.lang.Thread.run(Thread.java:745)

Thanks

Ping

From: Prannoy <pr...@sigmoidanalytics.com>>
Date: Friday, November 28, 2014 at 12:56 AM
To: "user@spark.incubator.apache.org<ma...@spark.incubator.apache.org>" <us...@spark.incubator.apache.org>>
Subject: Re: How to use FlumeInputDStream in spark cluster?

Hi,

BindException comes when two processes are using the same port. In your spark configuration just set ("spark.ui.port","xxxxx"),
to some other port. xxxxx can be any number say 12345. BindException will not break your job in either case. Just to fix it change the port number.

Thanks.

On Fri, Nov 28, 2014 at 1:30 PM, pamtang [via Apache Spark User List] <[hidden email]</user/SendEmail.jtp?type=node&node=19999&i=0>> wrote:
I'm seeing the same issue on CDH 5.2 with Spark 1.1. FlumeEventCount works fine on a Standalone cluster but throw BindException on YARN mode. Is there a solution to this problem or FlumeInputDStream will not be working in a cluster environment?

________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19997.html
To start a new topic under Apache Spark User List, email [hidden email]</user/SendEmail.jtp?type=node&node=19999&i=1>
To unsubscribe from Apache Spark User List, click here.
NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>


________________________________
View this message in context: Re: How to use FlumeInputDStream in spark cluster?<http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19999.html>
Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by Prannoy <pr...@sigmoidanalytics.com>.
Hi,

BindException comes when two processes are using the same port. In your
spark configuration just set ("spark.ui.port","xxxxx"),
to some other port. xxxxx can be any number say 12345. BindException will
not break your job in either case. Just to fix it change the port number.

Thanks.

On Fri, Nov 28, 2014 at 1:30 PM, pamtang [via Apache Spark User List] <
ml-node+s1001560n19997h9@n3.nabble.com> wrote:

> I'm seeing the same issue on CDH 5.2 with Spark 1.1. FlumeEventCount works
> fine on a Standalone cluster but throw BindException on YARN mode. Is there
> a solution to this problem or FlumeInputDStream will not be working in a
> cluster environment?
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19997.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1h33@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p19999.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by BigDataUser <su...@yahoo.com>.
I am running FlumeEventCount program in CDH 5.0.1 which has Spark 0.9.0. The
program runs fine in local process as well as standalone cluster mode.
However, the program fails in YARN mode. I see the following error:
INFO scheduler.DAGScheduler: Stage 2 (runJob at
NetworkInputTracker.scala:182) finished in 0.215 s
INFO spark.SparkContext: Job finished: runJob at
NetworkInputTracker.scala:182, took 0.224696381 s
ERROR scheduler.NetworkInputTracker: De-registered receiver for network
stream 0 with message org.jboss.netty.channel.ChannelException: Failed to
bind to: xxxxx/xx.xxx.x.xx:41415
Is there a workaround for this ?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p17226.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to use FlumeInputDStream in spark cluster?

Posted by julyfire <he...@gmail.com>.
I have test the example codes FlumeEventCount on standalone cluster, and this
is still a problem in Spark 1.1.0, the latest version up to now. Do you have
solved this issue in your way?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p15102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to use FlumeInputDStream in spark cluster?

Posted by Ravi Hemnani <ra...@gmail.com>.
On 03/21/2014 06:17 PM, anoldbrain [via Apache Spark User List] wrote:
> he actual <address>, which in turn causes the 'Fail to bind to ...' 
> error. This comes naturally because the slave that is running the code 
> to bind to <address>:<port> has a different ip. 
I ran sudo ./run-example 
org.apache.spark.streaming.examples.FlumeEventCount 
spark://<spark_master_hostname>:7077 <worker_hostname> 7781 on 
<worker_hostname> and still it shows

14/03/21 13:12:12 ERROR scheduler.NetworkInputTracker: De-registered 
receiver for network stream 0 with message 
org.jboss.netty.channel.ChannelException: Failed to bind 
to:<worker_hostname> /<worker_ipaddress>:7781
14/03/21 13:12:12 INFO spark.SparkContext: Job finished: runJob at 
NetworkInputTracker.scala:182, took 0.530447982 s
14/03/21 13:12:14 INFO scheduler.NetworkInputTracker: Stream 0 received 
0 blocks

Weird issue. I need to setup spark streaming and make it run. I am 
thinking to switch to kafka. I havent checked it yet but i dont see a 
work around for this. Any help would be good. I am making changes in the 
flume.conf and checking different settings.

Thank you.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2993.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

Posted by anoldbrain <an...@gmail.com>.
Hi,

This is my summary of the gap between expected behavior and actual behavior.

FlumeEventCount spark://<spark_master_hostname>:7077 <address> <port>

Expected: an 'agent' listening on <address>:<port> (bind to). In the context
of Spark, this agent should be running on one of the slaves, which should be
the slave whose ip/hostname is <address>.

Observed: A random slave is chosen in the pool of available slaves.
Therefore, in a cluster environment, is likely not the slave having the
actual <address>, which in turn causes the 'Fail to bind to ...' error. This
comes naturally because the slave that is running the code to bind to
<address>:<port> has a different ip.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.