You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Ali Asad Lotia <al...@gmail.com> on 2014/02/24 17:29:26 UTC

Failover sink processors

Greetings all,
I currently have flume deployed in a configuration where multiple
servers have local flume agents sending logs to avro sources on
central flume servers. These servers are sending the data to
elasticsearch using the elasticsearch sink. It generally works very
well, but when the elasticsearch servers can't keep up, I would rather
lose logs by having them sent to the null sink than to have any
services logging to this flume agent have issues because they can't
post messages to the elasticsearch sink.

In cases when the elasticsearch sinks are lagging, I can see logged
exceptions warning me that the sink likely can't keep up. The tricky
bit is that the connection to the elasticsearch sinks doesn't go down,
just that the channel on the central flume server remains full and
attempts to drain it keep failing.

I have confirmed failover to the null sink if I the elasticsearch
servers are down, but not when they can't keep up.

The current config of the central flume servers is provided below.
Advice/insight would be much appreciated.

Thank you,
Ali Asad Lotia

agent1.sources = avroSource httpSource
agent1.channels = memoryChannel
agent1.sinks = nullSink elasticSink


# For each one of the sources, the type is defined

# Avro source definition
agent1.sources.avroSource.type = avro
agent1.sources.avroSource.bind = 0.0.0.0
agent1.sources.avroSource.port = 16310
agent1.sources.avroSource.threads = 8
agent1.sources.avroSource.channels = memoryChannel

# HTTP source definition defaults to the JSONHandler
agent1.sources.httpSource.type = http
agent1.sources.httpSource.bind =  0.0.0.0
agent1.sources.httpSource.port = 16311
agent1.sources.httpSource.channels = memoryChannel

agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = nullSink
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.nullSink = 1
agent1.sinkgroups.g1.processor.priority.elasticSink = 10

agent1.sinks.nullSink.type = null
agent1.sinks.nullSink.channel = memoryChannel

# Each sink's type must be defined
agent1.sinks.elasticSink.type =
org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.elasticSink.hostNames = <my-elasticsearch-host>:16200
agent1.sinks.elasticSink.indexName = flume
agent1.sinks.elasticSink.indexType = logs
agent1.sinks.elasticSink.clusterName = <myorg>-us-east-1
agent1.sinks.elasticSink.batchSize = 100
agent1.sinks.elasticSink.ttl = 30
agent1.sinks.elasticSink.channel = memoryChannel
agent1.sinks.elasticSink.serializer =
org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer


# Each channel's type is defined.
agent1.channels.memoryChannel.type = memory


# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent1.channels.memoryChannel.capacity = 100000