You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "刘天昊 (JIRA)" <ji...@apache.org> on 2016/11/25 02:48:58 UTC
[jira] [Created] (FLUME-3032) taildir source sleeps frequently.
刘天昊 created FLUME-3032:
--------------------------
Summary: taildir source sleeps frequently.
Key: FLUME-3032
URL: https://issues.apache.org/jira/browse/FLUME-3032
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: v1.7.0
Environment: CentOS Linux release 7.2.1511 (Core)
java version "1.7.0_80"
Reporter: 刘天昊
Test configuration.
source - taildir
interceptor - The custom interceptor drops some events
channel - anyone
sink - none
I found that taildir source sleeps frequently.
The tailFileProcess() function in TaildirSource.java break the loop by (events.size() < batchSize), but interceptor may change events.size().
I think the events.size() should be used before interceptor processing.
Avoid unnecessary sleep.
{code:title=TaildirSource.java|borderStyle=solid}
private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
throws IOException, InterruptedException {
long receivedSize = 0;
while (true) {
reader.setCurrentFile(tf);
List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
if (events.isEmpty()) {
break;
}
receivedSize = events.size();
sourceCounter.addToEventReceivedCount(receivedSize);
sourceCounter.incrementAppendBatchReceivedCount();
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full or unexpected failure. "
+ "The source will try again after " + retryInterval
+ " ms");
TimeUnit.MILLISECONDS.sleep(retryInterval);
retryInterval = retryInterval << 1;
retryInterval = Math.min(retryInterval, maxRetryInterval);
continue;
}
retryInterval = 1000;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
if (receivedSize < batchSize) {
break;
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)