You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Attila Simon (JIRA)" <ji...@apache.org> on 2016/11/27 11:39:58 UTC

[jira] [Commented] (FLUME-3032) taildir source sleeps frequently.

    [ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15699547#comment-15699547 ] 

Attila Simon commented on FLUME-3032:
-------------------------------------

Nice finding and the change also looks good to me. Would you consider adding a junit test to cover this change? 

> taildir source sleeps frequently.
> ---------------------------------
>
>                 Key: FLUME-3032
>                 URL: https://issues.apache.org/jira/browse/FLUME-3032
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>         Environment: CentOS Linux release 7.2.1511 (Core) 
> java version "1.7.0_80"
>            Reporter: Liu Tianhao
>              Labels: newbie
>         Attachments: FLUME-3032.patch
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Test configuration.
> source - taildir
> interceptor -  The custom interceptor drops some events
> channel - anyone
> sink - none
> I found that taildir source sleeps frequently.
> The tailFileProcess() function in TaildirSource.java break the loop by (events.size() < batchSize), but interceptor may change events.size().
> I think the events.size() should be used before interceptor processing. 
> Avoid unnecessary sleep.
> {code:title=TaildirSource.java|borderStyle=solid}
>     private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>             throws IOException, InterruptedException {
>         long receivedSize = 0;
>         while (true) {
>             reader.setCurrentFile(tf);
>             List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
>             if (events.isEmpty()) {
>                 break;
>             }
>             receivedSize = events.size();
>             sourceCounter.addToEventReceivedCount(receivedSize);
>             sourceCounter.incrementAppendBatchReceivedCount();
>             try {
>                 getChannelProcessor().processEventBatch(events);
>                 reader.commit();
>             } catch (ChannelException ex) {
>                 logger.warn("The channel is full or unexpected failure. "
>                         + "The source will try again after " + retryInterval
>                         + " ms");
>                 TimeUnit.MILLISECONDS.sleep(retryInterval);
>                 retryInterval = retryInterval << 1;
>                 retryInterval = Math.min(retryInterval, maxRetryInterval);
>                 continue;
>             }
>             retryInterval = 1000;
>             sourceCounter.addToEventAcceptedCount(events.size());
>             sourceCounter.incrementAppendBatchAcceptedCount();
>             if (receivedSize < batchSize) {
>                 break;
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)