You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Attila Simon (JIRA)" <ji...@apache.org> on 2016/11/27 11:39:58 UTC
[jira] [Commented] (FLUME-3032) taildir source sleeps frequently.
[ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15699547#comment-15699547 ]
Attila Simon commented on FLUME-3032:
-------------------------------------
Nice finding and the change also looks good to me. Would you consider adding a junit test to cover this change?
> taildir source sleeps frequently.
> ---------------------------------
>
> Key: FLUME-3032
> URL: https://issues.apache.org/jira/browse/FLUME-3032
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.7.0
> Environment: CentOS Linux release 7.2.1511 (Core)
> java version "1.7.0_80"
> Reporter: Liu Tianhao
> Labels: newbie
> Attachments: FLUME-3032.patch
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> Test configuration.
> source - taildir
> interceptor - The custom interceptor drops some events
> channel - anyone
> sink - none
> I found that taildir source sleeps frequently.
> The tailFileProcess() function in TaildirSource.java break the loop by (events.size() < batchSize), but interceptor may change events.size().
> I think the events.size() should be used before interceptor processing.
> Avoid unnecessary sleep.
> {code:title=TaildirSource.java|borderStyle=solid}
> private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
> throws IOException, InterruptedException {
> long receivedSize = 0;
> while (true) {
> reader.setCurrentFile(tf);
> List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
> if (events.isEmpty()) {
> break;
> }
> receivedSize = events.size();
> sourceCounter.addToEventReceivedCount(receivedSize);
> sourceCounter.incrementAppendBatchReceivedCount();
> try {
> getChannelProcessor().processEventBatch(events);
> reader.commit();
> } catch (ChannelException ex) {
> logger.warn("The channel is full or unexpected failure. "
> + "The source will try again after " + retryInterval
> + " ms");
> TimeUnit.MILLISECONDS.sleep(retryInterval);
> retryInterval = retryInterval << 1;
> retryInterval = Math.min(retryInterval, maxRetryInterval);
> continue;
> }
> retryInterval = 1000;
> sourceCounter.addToEventAcceptedCount(events.size());
> sourceCounter.incrementAppendBatchAcceptedCount();
> if (receivedSize < batchSize) {
> break;
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)