You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Mike Percy (JIRA)" <ji...@apache.org> on 2016/10/06 11:08:20 UTC
[jira] [Commented] (FLUME-2811) Taildir source doesn't call stop()
on graceful shutdown
[ https://issues.apache.org/jira/browse/FLUME-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551640#comment-15551640 ]
Mike Percy commented on FLUME-2811:
-----------------------------------
Hi [~siefried12] it would be very helpful if you could provide the full Flume configuration file as well as the output of several successive runs of {{jstack}} on the Flume process while it is hung. I tried to reproduce this and so far have not been able to do it.
> Taildir source doesn't call stop() on graceful shutdown
> -------------------------------------------------------
>
> Key: FLUME-2811
> URL: https://issues.apache.org/jira/browse/FLUME-2811
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.7.0
> Reporter: Jun Seok Hong
> Assignee: Umesh Chaudhary
> Priority: Critical
> Labels: newbie
> Fix For: v1.7.0
>
>
> Taildir source doesn't call stop() on graceful shutdown.
> Test configuration.
> source - taildir
> channel - PseudoTxnMemoryChannel / flume-kafka-channel
> sink - none
> I found that flume sometimes doesn't terminate with Taildir source.
> I had to kill the process to terminate it.
> tailFileProcess() function in TaildirSource.java has a infinite loop.
> When the process interrupted, ChannelException will happen, but it can't breaks the infinite loop.
> I think that's the reason why Taildir can't call stop() function.
> {code:title=TaildirSource.java|borderStyle=solid}
> private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
> throws IOException, InterruptedException {
> while (true) {
> reader.setCurrentFile(tf);
> List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
> if (events.isEmpty()) {
> break;
> }
> sourceCounter.addToEventReceivedCount(events.size());
> sourceCounter.incrementAppendBatchReceivedCount();
> try {
> getChannelProcessor().processEventBatch(events);
> reader.commit();
> } catch (ChannelException ex) {
> logger.warn("The channel is full or unexpected failure. " +
> "The source will try again after " + retryInterval + " ms");
> TimeUnit.MILLISECONDS.sleep(retryInterval);
> retryInterval = retryInterval << 1;
> retryInterval = Math.min(retryInterval, maxRetryInterval);
> continue;
> }
> retryInterval = 1000;
> sourceCounter.addToEventAcceptedCount(events.size());
> sourceCounter.incrementAppendBatchAcceptedCount();
> if (events.size() < batchSize) {
> break;
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)