You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 鹰 <98...@qq.com> on 2015/07/16 10:32:56 UTC

spark-streaming whit flume run error under yarn model

hi all,
I'm use spark-streaming with spark ,I configure flume like this:
a1.channels = c1
 a1.sinks = k1
 a1.sources = r1
 
 a1.sinks.k1.type = avro
 a1.sinks.k1.channel = c1
 a1.sinks.k1.hostname = localhost
 a1.sinks.k1.port = 33333
 
 a1.sources.r1.type = avro
 a1.sources.r1.bind = localhost
 a1.sources.r1.port = 44444
 a1.sources.r1.channels = c1
 
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000
 a1.channels.c1.transactionCapacity = 100

my spark code is like this :
 def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }

 //   val Array(host, new  Integer(port)) = args
    val host=args(0)
    val port=args(1).toInt
    val batchInterval = Milliseconds(2000)
    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()
  }



when I run this with local model it works okay 
the steps is like this
first run spart-streaming jobs by spark-submit --master local[8]        --class com.nd.test.FlumeEventCount   simple-projectnew-1.2.1-jar-with-dependencies.jar  localhost 3333
then start  flume by  flume-ng agent -c ../conf -f ./sparkflum.conf -n a1    -Dflume.root.logger=INFO,console 

at last send data by flume-ng avro-client --conf ./conf -H localhost -p 44444  -F ./example.conf  -Dflume.root.logger=DEBUG,console
it works okay 

but when I run spark-streaming job with yarn-client or yarn cluster  ERRORS happen

the error message is like this :
15/07/16 14:36:36 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:33333
15/07/16 14:36:36 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 74)
org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:33333
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)
    at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)
    at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:288)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:280)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
Caused by: java.net.BindException: Cannot assign requested address
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:344)
    at sun.nio.ch.Net.bind(Net.java:336)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
    at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)



and flume also have errors error message like this :


[ERROR - org.apache.flume.SinkRunner$PollingRun
ner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: sparktest.com, port: 33333 }: RPC conne
ction error
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:117)
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:93)
        at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:507)
        at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
        at org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:182)
        at org.apache.flume.sink.AvroSink.verifyConnection(AvroSink.java:222)
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:282)
        ... 3 more
Caused by: java.io.IOException: Error connecting to /192.168.1.17:33333
        at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
        at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
        at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:106)
        ... 9 more

Dose any body know what happens ,what can I do ,thanks!