You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Anat Rozenzon <an...@viber.com> on 2013/07/23 16:59:15 UTC

Problem with Avro sink reconnecting to Avro source

Hi,

Sorry if this is a newbie question...
I'm trying to send events from an agent to a collector server through a
pair of Avro sink/source.
Everything is working properly.
But, it seems that whenever the collector server is down or not accessible
for some reason, the agent doesn't recover from it and must be restarted.

Anything I'm doing wrong?

This is the agent config:
agent.sources = apache
agent.sources.apache.type = exec
agent.sources.apache.command = tail -F /myfolder/mylog.log
agent.sources.apache.batchSize = 1
agent.sources.apache.channels = memoryChannel

agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100

agent.sinks = AvroSink
agent.sinks.AvroSink.type = avro
agent.sinks.AvroSink.channel = memoryChannel
agent.sinks.AvroSink.hostname = 1.2.3.4 # this is the collector IP
agent.sinks.AvroSink.port = 45454


This is the collector config:

collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 45454
collector.sources.AvroIn.channels = mc1 mc2

collector.channels = mc1 mc2

collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 100

collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100

collector.sinks = LocalOut HadoopOut

collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory =/disk/flume-debug-sink
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1

collector.sinks.HadoopOut.type = logger
collector.sinks.HadoopOut.channel = mc2


I get these errors in agent machine:

23 Jul 2013 10:53:19,714 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
host: 194.90.232.77, port: 45454 }: Failed to send batch
        at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:294)
        at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:366)
        ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
host: 194.90.232.77, port: 45454 }: RPC request exception
        at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:344)
        at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:282)
        ... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Error connecting to /194.90.232.77:45454
        at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:262)
        at java.util.concurrent.FutureTask.get(FutureTask.java:119)
        at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:336)
        ... 5 more
Caused by: java.io.IOException: Error connecting to /194.90.232.77:45454
        at
org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
        at
org.apache.avro.ipc.NettyTransceiver.getRemoteName(NettyTransceiver.java:386)
        at org.apache.avro.ipc.Requestor.writeHandshake(Requestor.java:202)
        at org.apache.avro.ipc.Requestor.access$300(Requestor.java:52)
        at
org.apache.avro.ipc.Requestor$Request.getBytes(Requestor.java:478)
        at org.apache.avro.ipc.Requestor.request(Requestor.java:147)
        at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
        at
org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:68)
        at $Proxy7.appendBatch(Unknown Source)
        at
org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:327)
        at
org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:323)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        ... 1 more
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:701)
        at
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:396)
        at
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:358)
        at
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:274)
        ... 3 more
23 Jul 2013 10:53:49,771 INFO
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.AbstractRpcSink.createConnection:205)  - Rpc sink
AvroSink: Building RpcClient with hostname: 194.90.232.77, port: 45454
23 Jul 2013 10:53:49,771 INFO
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.AvroSink.initializeRpcClient:126)  - Attempting to
create Avro Rpc client.


In the collector's sink.directory =/disk/flume-debug-sink an empty file is
created, but nothing is written there.

Anything I'm doing wrong?

Thanks
Anat