You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Anat Rozenzon <an...@viber.com> on 2013/07/23 16:59:15 UTC
Problem with Avro sink reconnecting to Avro source
Hi,
Sorry if this is a newbie question...
I'm trying to send events from an agent to a collector server through a
pair of Avro sink/source.
Everything is working properly.
But, it seems that whenever the collector server is down or not accessible
for some reason, the agent doesn't recover from it and must be restarted.
Anything I'm doing wrong?
This is the agent config:
agent.sources = apache
agent.sources.apache.type = exec
agent.sources.apache.command = tail -F /myfolder/mylog.log
agent.sources.apache.batchSize = 1
agent.sources.apache.channels = memoryChannel
agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100
agent.sinks = AvroSink
agent.sinks.AvroSink.type = avro
agent.sinks.AvroSink.channel = memoryChannel
agent.sinks.AvroSink.hostname = 1.2.3.4 # this is the collector IP
agent.sinks.AvroSink.port = 45454
This is the collector config:
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 45454
collector.sources.AvroIn.channels = mc1 mc2
collector.channels = mc1 mc2
collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 100
collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100
collector.sinks = LocalOut HadoopOut
collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory =/disk/flume-debug-sink
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1
collector.sinks.HadoopOut.type = logger
collector.sinks.HadoopOut.channel = mc2
I get these errors in agent machine:
23 Jul 2013 10:53:19,714 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
host: 194.90.232.77, port: 45454 }: Failed to send batch
at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:294)
at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:366)
... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
host: 194.90.232.77, port: 45454 }: RPC request exception
at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:344)
at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:282)
... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Error connecting to /194.90.232.77:45454
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:262)
at java.util.concurrent.FutureTask.get(FutureTask.java:119)
at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:336)
... 5 more
Caused by: java.io.IOException: Error connecting to /194.90.232.77:45454
at
org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at
org.apache.avro.ipc.NettyTransceiver.getRemoteName(NettyTransceiver.java:386)
at org.apache.avro.ipc.Requestor.writeHandshake(Requestor.java:202)
at org.apache.avro.ipc.Requestor.access$300(Requestor.java:52)
at
org.apache.avro.ipc.Requestor$Request.getBytes(Requestor.java:478)
at org.apache.avro.ipc.Requestor.request(Requestor.java:147)
at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
at
org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:68)
at $Proxy7.appendBatch(Unknown Source)
at
org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:327)
at
org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:323)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
... 1 more
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:701)
at
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:396)
at
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:358)
at
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:274)
... 3 more
23 Jul 2013 10:53:49,771 INFO
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.AbstractRpcSink.createConnection:205) - Rpc sink
AvroSink: Building RpcClient with hostname: 194.90.232.77, port: 45454
23 Jul 2013 10:53:49,771 INFO
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.AvroSink.initializeRpcClient:126) - Attempting to
create Avro Rpc client.
In the collector's sink.directory =/disk/flume-debug-sink an empty file is
created, but nothing is written there.
Anything I'm doing wrong?
Thanks
Anat