You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Derek Deeter <dd...@gmail.com> on 2011/09/21 00:02:54 UTC

Flume collector hangs on write/read timeout

We have set up a configuration that uses the collector sink with our own
sink (called GlobalLogSink).  Configuration is:

collector (15000) { GlobalLogSink(...) }

The GlobalLogSink connects to Cassandra which we have not fully tuned yet.
We are running into an exception in the RollSink code that causes
DirectDriver to close down, which occurs when Cassandra  does a Garbage
Collection or some other time intensive action that causes a response to be
longer than necessary.  I have pasted two tracebacks at the end of this
email, one from the collector and the other from jstack.

The code at RollSink:296  does a write with a wait set to 1000 millis; if
the write takes more than that, an exception occurs that shuts the sink
down.

When testing in our development environment, we found that this problem went
away, but there we were using the first flume-0.9.4 release, which has the
'synchronize' functionality before the ReentrantReadWriteLock was
implemented, so we had to back down a release or two from flume-0.9.4+25.6,
but then we ran into Ack problems.

There are a couple fixes needed to correct this issue:
We would like to add a fix to RollSink.java to make the 1000ms delay
configurable.
Also, when the timeout does occur, the Sink should fail gracefully and not
just hang/close down.
I would like to open a JIRA on this.

To fix the problem I need to know what source/branch to use - this error
occurs in the source associated with the flume-0.9.4+25 RPM file where we
found source at 0.9.4-cdh3u1 that corresponds to that.  Is there a branch
label or tag that we should use to fix this?

A couple of related questions we'd appreciate answers to:
1) Is there a way to set up a collector sink that doesn't use RollSink?
2) Is there a way to get acknowledgments without using RollSink?

Ideally, we would like to have something like an 'AckSink' that's lets us
utilize E2E with out having the RollSink functionality.

Thanks in advance,
Derek Deeter




2011-09-15 11:17:24,425 INFO com.cloudera.flume.handlers.rolling.RollSink:
closing RollSink 'globalLogSink( "Global_Logging", "Audit_Log", "
192.168.80.138:9160", "192.168.80.140:9160", "192.168.80.141:9160", "
192.168.80.148:9160" )'
2011-09-15 11:17:24,429 ERROR
com.cloudera.flume.core.connector.DirectDriver: Closing down due to
exception during append calls
java.lang.InterruptedException
                at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1159)
                at
java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.tryLock(ReentrantReadWriteLock.java:976)
                at com.cloudera.flume.handlers.rolling.
RollSink.close(RollSink.java:296)
                at
com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
                at
com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
                at
com.cloudera.flume.handlers.debug.InsistentOpenDecorator.close(InsistentOpenDecorator.java:175)
                at
com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
                at
com.cloudera.flume.handlers.debug.StubbornAppendSink.append(StubbornAppendSink.java:78)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.debug.InsistentAppendDecorator.append(InsistentAppendDecorator.java:110)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.endtoend.AckChecksumChecker.append(AckChecksumChecker.java:172)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.batch.UnbatchingDecorator.append(UnbatchingDecorator.java:62)
                at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
                at
com.cloudera.flume.handlers.batch.GunzipDecorator.append(GunzipDecorator.java:81)
                at
com.cloudera.flume.collector.CollectorSink.append(CollectorSink.java:222)
                at
com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:110)
2011-09-15 11:17:24,430 INFO com.cloudera.flume.core.connector.DirectDriver:
Connector logicalNode strafe1a.web.prod.diginsite.com-19 exited with error:
null
java.lang.InterruptedException

*This is what a jstack trace showed just at when the Collector hung:*

2011-09-15 15:42:10
Full thread dump Java HotSpot(TM) 64-Bit Server VM (11.0-b16 mixed mode):

"pool-2-thread-1" prio=10 tid=0x00002aab3c064400 nid=0x16a9 runnable
[0x0000000041523000..0x0000000041523d10]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:129)
        at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
        at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
        at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
        at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
        at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
        at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
        at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
        at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
        at
org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1025)
        at
org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1009)
        at
com.intuit.ifs.globallogging.cassandra.plugin.CassandraClient.batchMutate(CassandraClient.java:155)
        at
com.intuit.ifs.globallogging.cassandra.plugin.CassandraClient.insert(CassandraClient.java:139)
        at
com.intuit.ifs.globallogging.cassandra.plugin.GlobalLogSink.append(GlobalLogSink.java:101)
        at
com.cloudera.flume.core.CompositeSink.append(CompositeSink.java:61)
        at
com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
        at com.cloudera.flume.handlers.rolling.
RollSink.synchronousAppend(RollSink.java:234)
        at
com.cloudera.flume.handlers.rolling.RollSink$1.call(RollSink.java:183)
        at
com.cloudera.flume.handlers.rolling.RollSink$1.call(RollSink.java:181)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)

Re: Flume collector hangs on write/read timeout

Posted by Eric Sammer <es...@cloudera.com>.
No one's on vacation, that's for sure. :)

You're correct in that there's no reason for collector(N) { ...  } to imply
roll behavior unless you consider the historical context. The issue is that
the collector(N) { ... } syntax was a shortcut method to wrap a user
specified sink in "most of the
normally-desirable-collector-type-functionality in a jiffy." Historically,
collectorSink() - the "parent" of collector(N) { ... } - has been used to
write to file systems where roll, output bucketing, compression, and other
such things make sense.

The real solution is to compose a sink of the correct decos and sinks to get
just the right amount of functionality you need. The issue is that any
change to the perfect order could introduce subtle bugs (e.g. should
stubborn append come before or after insistent append?). As I've never
tested all possible permutations, I'm reluctant to send you off with a magic
incantation, as such. Obviously the ideal situation would be to simplify
things so this kind of configuration is obvious, testable, and simple to
configure; that's what flume-728 is about.

For now, you can look at the source to CollectorSink and see the pipeline of
things we put together and then take out what you don't want. I agree it's
weird to roll for things like HBase or Cassandra.

On Fri, Sep 23, 2011 at 7:28 AM, Ralph Goers <ra...@dslextreme.com>wrote:

> Did everyone go on vacation?
>
> Ralph
>
> On Sep 21, 2011, at 8:49 PM, Ralph Goers wrote:
>
> >
> > On Sep 20, 2011, at 3:02 PM, Derek Deeter wrote:
> >
> >> We have set up a configuration that uses the collector sink with our own
> >> sink (called GlobalLogSink).  Configuration is:
> >>
> >> collector (15000) { GlobalLogSink(...) }
> >>
> >> The GlobalLogSink connects to Cassandra which we have not fully tuned
> yet.
> >> We are running into an exception in the RollSink code that causes
> >> DirectDriver to close down, which occurs when Cassandra  does a Garbage
> >> Collection or some other time intensive action that causes a response to
> be
> >> longer than necessary.  I have pasted two tracebacks at the end of this
> >> email, one from the collector and the other from jstack.
> >>
> >> The code at RollSink:296  does a write with a wait set to 1000 millis;
> if
> >> the write takes more than that, an exception occurs that shuts the sink
> >> down.
> >>
> >> When testing in our development environment, we found that this problem
> went
> >> away, but there we were using the first flume-0.9.4 release, which has
> the
> >> 'synchronize' functionality before the ReentrantReadWriteLock was
> >> implemented, so we had to back down a release or two from
> flume-0.9.4+25.6,
> >> but then we ran into Ack problems.
> >>
> >> There are a couple fixes needed to correct this issue:
> >> We would like to add a fix to RollSink.java to make the 1000ms delay
> >> configurable.
> >> Also, when the timeout does occur, the Sink should fail gracefully and
> not
> >> just hang/close down.
> >> I would like to open a JIRA on this.
> >>
> >> To fix the problem I need to know what source/branch to use - this error
> >> occurs in the source associated with the flume-0.9.4+25 RPM file where
> we
> >> found source at 0.9.4-cdh3u1 that corresponds to that.  Is there a
> branch
> >> label or tag that we should use to fix this?
> >>
> >> A couple of related questions we'd appreciate answers to:
> >> 1) Is there a way to set up a collector sink that doesn't use RollSink?
> >> 2) Is there a way to get acknowledgments without using RollSink?
> >>
> >> Ideally, we would like to have something like an 'AckSink' that's lets
> us
> >> utilize E2E with out having the RollSink functionality.
> >>
> >> Thanks in advance,
> >> Derek Deeter
> >
> > I'd really like some information as to why the CollectorSink is hard
> coded to have a RollSink. Although the use described above wants
> acknowledgements it isn't clear why closing and reopening is necessary. I
> would think that in this use case the locking wouldn't be necessary just to
> send the acknowledgements.
> >
> > Can someone explain why the code works this way?
> >
> > Ralph
>
>


-- 
Eric Sammer
twitter: esammer
data: www.cloudera.com

Re: Flume collector hangs on write/read timeout

Posted by Ralph Goers <ra...@dslextreme.com>.
Did everyone go on vacation?

Ralph

On Sep 21, 2011, at 8:49 PM, Ralph Goers wrote:

> 
> On Sep 20, 2011, at 3:02 PM, Derek Deeter wrote:
> 
>> We have set up a configuration that uses the collector sink with our own
>> sink (called GlobalLogSink).  Configuration is:
>> 
>> collector (15000) { GlobalLogSink(...) }
>> 
>> The GlobalLogSink connects to Cassandra which we have not fully tuned yet.
>> We are running into an exception in the RollSink code that causes
>> DirectDriver to close down, which occurs when Cassandra  does a Garbage
>> Collection or some other time intensive action that causes a response to be
>> longer than necessary.  I have pasted two tracebacks at the end of this
>> email, one from the collector and the other from jstack.
>> 
>> The code at RollSink:296  does a write with a wait set to 1000 millis; if
>> the write takes more than that, an exception occurs that shuts the sink
>> down.
>> 
>> When testing in our development environment, we found that this problem went
>> away, but there we were using the first flume-0.9.4 release, which has the
>> 'synchronize' functionality before the ReentrantReadWriteLock was
>> implemented, so we had to back down a release or two from flume-0.9.4+25.6,
>> but then we ran into Ack problems.
>> 
>> There are a couple fixes needed to correct this issue:
>> We would like to add a fix to RollSink.java to make the 1000ms delay
>> configurable.
>> Also, when the timeout does occur, the Sink should fail gracefully and not
>> just hang/close down.
>> I would like to open a JIRA on this.
>> 
>> To fix the problem I need to know what source/branch to use - this error
>> occurs in the source associated with the flume-0.9.4+25 RPM file where we
>> found source at 0.9.4-cdh3u1 that corresponds to that.  Is there a branch
>> label or tag that we should use to fix this?
>> 
>> A couple of related questions we'd appreciate answers to:
>> 1) Is there a way to set up a collector sink that doesn't use RollSink?
>> 2) Is there a way to get acknowledgments without using RollSink?
>> 
>> Ideally, we would like to have something like an 'AckSink' that's lets us
>> utilize E2E with out having the RollSink functionality.
>> 
>> Thanks in advance,
>> Derek Deeter
> 
> I'd really like some information as to why the CollectorSink is hard coded to have a RollSink. Although the use described above wants acknowledgements it isn't clear why closing and reopening is necessary. I would think that in this use case the locking wouldn't be necessary just to send the acknowledgements. 
> 
> Can someone explain why the code works this way?
> 
> Ralph


Re: Flume collector hangs on write/read timeout

Posted by Ralph Goers <ra...@dslextreme.com>.
On Sep 20, 2011, at 3:02 PM, Derek Deeter wrote:

> We have set up a configuration that uses the collector sink with our own
> sink (called GlobalLogSink).  Configuration is:
> 
> collector (15000) { GlobalLogSink(...) }
> 
> The GlobalLogSink connects to Cassandra which we have not fully tuned yet.
> We are running into an exception in the RollSink code that causes
> DirectDriver to close down, which occurs when Cassandra  does a Garbage
> Collection or some other time intensive action that causes a response to be
> longer than necessary.  I have pasted two tracebacks at the end of this
> email, one from the collector and the other from jstack.
> 
> The code at RollSink:296  does a write with a wait set to 1000 millis; if
> the write takes more than that, an exception occurs that shuts the sink
> down.
> 
> When testing in our development environment, we found that this problem went
> away, but there we were using the first flume-0.9.4 release, which has the
> 'synchronize' functionality before the ReentrantReadWriteLock was
> implemented, so we had to back down a release or two from flume-0.9.4+25.6,
> but then we ran into Ack problems.
> 
> There are a couple fixes needed to correct this issue:
> We would like to add a fix to RollSink.java to make the 1000ms delay
> configurable.
> Also, when the timeout does occur, the Sink should fail gracefully and not
> just hang/close down.
> I would like to open a JIRA on this.
> 
> To fix the problem I need to know what source/branch to use - this error
> occurs in the source associated with the flume-0.9.4+25 RPM file where we
> found source at 0.9.4-cdh3u1 that corresponds to that.  Is there a branch
> label or tag that we should use to fix this?
> 
> A couple of related questions we'd appreciate answers to:
> 1) Is there a way to set up a collector sink that doesn't use RollSink?
> 2) Is there a way to get acknowledgments without using RollSink?
> 
> Ideally, we would like to have something like an 'AckSink' that's lets us
> utilize E2E with out having the RollSink functionality.
> 
> Thanks in advance,
> Derek Deeter

I'd really like some information as to why the CollectorSink is hard coded to have a RollSink. Although the use described above wants acknowledgements it isn't clear why closing and reopening is necessary. I would think that in this use case the locking wouldn't be necessary just to send the acknowledgements. 

Can someone explain why the code works this way?

Ralph