You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Majid Alfifi <ma...@gmail.com> on 2014/12/02 07:30:35 UTC

Flume Sink Group doesn't seem to back off

---------- Forwarded message ----------
From: *Majid Alfifi* <ma...@gmail.com>
Date: Monday, December 1, 2014
Subject: Flume Sink Group doesn't seem to back off
To: cdh-user@cloudera.org


Hello,

I was trying out a flume setup but couldn't get a group of AvroSinks to
back off when I shutdown one of the receiving agents. Connection errors
keep showing in the logs in about every 30 seconds. I don't see exponential
back off.

agent.conf looks like the following.

agent.sources = MySource
> agent.sinkgroups = LoadBalancerSinkGroup
> agent.sinks = MongodbSink AvroSink1 AvroSink2 AvroSink3 AvroSink4

agent.channels = MemChannel
> agent.sources. MySource.type = com.example.CustomSource
> agent.sources. MySource.channels = MemChannel
> # Use a channel which buffers events in memory
> agent.channels.MemChannel.type = memory
> agent.channels.MemChannel.capacity = 1000000
> agent.channels.MemChannel.transactionCapacity = 10000
> #agent.channels.FileChannel.type = file
> #agent.channels.FileChannel.capacity = 10000000
> #agent.channels.FileChannel.transactionCapacity = 10000
> #agent.channels.FileChannel.checkpointDir = /var/lib/flume-data/checkpoint
> #agent.channels.FileChannel.dataDirs = /var/lib/flume-data/data
> #agent.channels.FileChannel.checkpointInterval = 30
>
> agent.sinkgroups.LoadBalancerSinkGroup.sinks = AvroSink1 AvroSink2
> AvroSink3 AvroSink4
> agent.sinkgroups.LoadBalancerSinkGroup.processor.type = load_balance
> agent.sinkgroups.LoadBalancerSinkGroup.processor.selector = random
> agent.sinkgroups.LoadBalancerSinkGroup.processor.backoff = true
> # Bind the source and sink to the channel
> agent.sources. MySource.channels = MemChannel
> # avro sink 1
> agent.sinks.AvroSink1.type = avro
> agent.sinks.AvroSink1.channel = MemChannel
> agent.sinks.AvroSink1.hostname = gather01
> agent.sinks.AvroSink1.port = 4353
> agent.sinks.AvroSink1.compression-type = deflate
> agent.sinks.AvroSink1.batch-size=100
> # avro sink 2
> agent.sinks.AvroSink2.type = avro
> agent.sinks.AvroSink2.channel = MemChannel
> agent.sinks.AvroSink2.hostname = gather02
> agent.sinks.AvroSink2.port = 4353
> agent.sinks.AvroSink2.compression-type = deflate
> agent.sinks.AvroSink2.batch-size=100
> # avro sink 3
> agent.sinks.AvroSink3.type = avro
> agent.sinks.AvroSink3.channel = MemChannel
> agent.sinks.AvroSink3.hostname = gather03
> agent.sinks.AvroSink3.port = 4353
> agent.sinks.AvroSink3.compression-type = deflate
> agent.sinks.AvroSink3.batch-size=100
> # avro sink 4
> agent.sinks.AvroSink4.type = avro
> agent.sinks.AvroSink4.channel = MemChannel
> agent.sinks.AvroSink4.hostname = gather04
> agent.sinks.AvroSink4.port = 4353
> agent.sinks.AvroSink4.compression-type = deflate
> agent.sinks.AvroSink4.batch-size=100
> # Mongodb Sink
> agent.sinks.MongodbSink.type = com.example.MongodbSink
> agent.sinks.MongodbSink.channel = MemChannel
> agent.sinks.MongodbSink.hostname =
> agent.sinks.MongodbSink.port =
> agent.sinks.MongodbSink.dbname =
> agent.sinks.MongodbSink.collection-prefix =
> agent.sinks.MongodbSink.batch = 100



errors from log:

30 Nov 2014 23:49:46,429 INFO
> [SinkRunner-PollingRunner-LoadBalancingSinkProcessor]
> (org.apache.flume.sink.AbstractRpcSink.createConnection:206)  - Rpc sink
> AvroSink1: Building RpcClient with hostname: gather01, port: 4353
> 30 Nov 2014 23:49:46,430 INFO
> [SinkRunner-PollingRunner-LoadBalancingSinkProcessor]
> (org.apache.flume.sink.AvroSink.initializeRpcClient:126)  - Attempting to
> create Avro Rpc client.
> 30 Nov 2014 23:49:46,431 WARN
> [SinkRunner-PollingRunner-LoadBalancingSinkProcessor]
> (org.apache.flume.api.NettyAvroRpcClient.configure:620)  - Using default
> maxIOWorkers
> 30 Nov 2014 23:49:46,442 WARN
> [SinkRunner-PollingRunner-LoadBalancingSinkProcessor]
> (org.apache.flume.sink.LoadBalancingSinkProcessor.process:158)  - Sink
> failed to consume event. Attempting next sink if available.
> org.apache.flume.EventDeliveryException: Failed to send events
> at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
> at
> org.apache.flume.sink.LoadBalancingSinkProcessor.process(LoadBalancingSinkProcessor.java:154)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host:
> gather01, port: 4353 }: RPC connection error
> at
> org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:178)
> at
> org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:118)
> at
> org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:624)
> at
> org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
> at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
> at
> org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:211)
> at
> org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:272)
> at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:349)
> ... 3 more
> Caused by: java.io.IOException: Error connecting to
> gather01/10.125.abc.xyz:4353
> at
> org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:280)
> at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:206)
> at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:155)
> at
> org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:164)
> ... 10 more
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
> at
> org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:148)
> at
> org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:104)
> at
> org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:78)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at
> org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:41)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ... 1 more