You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "刘天昊 (JIRA)" <ji...@apache.org> on 2016/11/25 03:03:00 UTC

[jira] [Updated] (FLUME-3032) taildir source sleeps frequently.

     [ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

刘天昊 updated FLUME-3032:
-----------------------
    Attachment: FLUME-3032.patch

> taildir source sleeps frequently.
> ---------------------------------
>
>                 Key: FLUME-3032
>                 URL: https://issues.apache.org/jira/browse/FLUME-3032
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>         Environment: CentOS Linux release 7.2.1511 (Core) 
> java version "1.7.0_80"
>            Reporter: 刘天昊
>              Labels: newbie
>         Attachments: FLUME-3032.patch
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Test configuration.
> source - taildir
> interceptor -  The custom interceptor drops some events
> channel - anyone
> sink - none
> I found that taildir source sleeps frequently.
> The tailFileProcess() function in TaildirSource.java break the loop by (events.size() < batchSize), but interceptor may change events.size().
> I think the events.size() should be used before interceptor processing. 
> Avoid unnecessary sleep.
> {code:title=TaildirSource.java|borderStyle=solid}
>     private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>             throws IOException, InterruptedException {
>         long receivedSize = 0;
>         while (true) {
>             reader.setCurrentFile(tf);
>             List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
>             if (events.isEmpty()) {
>                 break;
>             }
>             receivedSize = events.size();
>             sourceCounter.addToEventReceivedCount(receivedSize);
>             sourceCounter.incrementAppendBatchReceivedCount();
>             try {
>                 getChannelProcessor().processEventBatch(events);
>                 reader.commit();
>             } catch (ChannelException ex) {
>                 logger.warn("The channel is full or unexpected failure. "
>                         + "The source will try again after " + retryInterval
>                         + " ms");
>                 TimeUnit.MILLISECONDS.sleep(retryInterval);
>                 retryInterval = retryInterval << 1;
>                 retryInterval = Math.min(retryInterval, maxRetryInterval);
>                 continue;
>             }
>             retryInterval = 1000;
>             sourceCounter.addToEventAcceptedCount(events.size());
>             sourceCounter.incrementAppendBatchAcceptedCount();
>             if (receivedSize < batchSize) {
>                 break;
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)