You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Jun Seok Hong (JIRA)" <ji...@apache.org> on 2015/10/14 08:27:05 UTC
[jira] [Created] (FLUME-2811) Taildir source doesn't call stop() on
graceful shutdown
Jun Seok Hong created FLUME-2811:
------------------------------------
Summary: Taildir source doesn't call stop() on graceful shutdown
Key: FLUME-2811
URL: https://issues.apache.org/jira/browse/FLUME-2811
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: v1.7.0
Reporter: Jun Seok Hong
Priority: Critical
Taildir source doesn't call stop() on graceful shutdown.
Test configuration.
source - taildir
channel - PseudoTxnMemoryChannel / flume-kafka-channel
sink - none
I found that flume sometimes doesn't terminate with Taildir source.
I had to kill the process to terminate it.
tailFileProcess() function in TaildirSource.java has a infinite loop.
When the process interrupted, ChannelException will happen, but it can't breaks the infinite loop.
I think that's the reason why Taildir can't call stop() function.
{code:title=TaildirSource.java|borderStyle=solid}
private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
throws IOException, InterruptedException {
while (true) {
reader.setCurrentFile(tf);
List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full or unexpected failure. " +
"The source will try again after " + retryInterval + " ms");
TimeUnit.MILLISECONDS.sleep(retryInterval);
retryInterval = retryInterval << 1;
retryInterval = Math.min(retryInterval, maxRetryInterval);
continue;
}
retryInterval = 1000;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
if (events.size() < batchSize) {
break;
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)