You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 鹰 <98...@qq.com> on 2015/07/16 10:32:56 UTC
spark-streaming whit flume run error under yarn model
hi all,
I'm use spark-streaming with spark ,I configure flume like this:
a1.channels = c1
a1.sinks = k1
a1.sources = r1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 33333
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
my spark code is like this :
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumeEventCount <host> <port>")
System.exit(1)
}
// val Array(host, new Integer(port)) = args
val host=args(0)
val port=args(1).toInt
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a flume stream
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination()
}
when I run this with local model it works okay
the steps is like this
first run spart-streaming jobs by spark-submit --master local[8] --class com.nd.test.FlumeEventCount simple-projectnew-1.2.1-jar-with-dependencies.jar localhost 3333
then start flume by flume-ng agent -c ../conf -f ./sparkflum.conf -n a1 -Dflume.root.logger=INFO,console
at last send data by flume-ng avro-client --conf ./conf -H localhost -p 44444 -F ./example.conf -Dflume.root.logger=DEBUG,console
it works okay
but when I run spark-streaming job with yarn-client or yarn cluster ERRORS happen
the error message is like this :
15/07/16 14:36:36 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:33333
15/07/16 14:36:36 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 74)
org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:33333
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)
at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)
at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:288)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:280)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.net.BindException: Cannot assign requested address
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:344)
at sun.nio.ch.Net.bind(Net.java:336)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
and flume also have errors error message like this :
[ERROR - org.apache.flume.SinkRunner$PollingRun
ner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: sparktest.com, port: 33333 }: RPC conne
ction error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:117)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:93)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:507)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:182)
at org.apache.flume.sink.AvroSink.verifyConnection(AvroSink.java:222)
at org.apache.flume.sink.AvroSink.process(AvroSink.java:282)
... 3 more
Caused by: java.io.IOException: Error connecting to /192.168.1.17:33333
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:106)
... 9 more
Dose any body know what happens ,what can I do ,thanks!