You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flume.apache.org by "wang qun (Jira)" <ji...@apache.org> on 2020/05/07 05:00:00 UTC

[jira] [Updated] (FLUME-3364) TailDir BackOff Method always blocks the thread

     [ https://issues.apache.org/jira/browse/FLUME-3364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

wang qun updated FLUME-3364:
----------------------------
    Priority: Critical  (was: Major)

> TailDir BackOff Method always blocks the thread
> -----------------------------------------------
>
>                 Key: FLUME-3364
>                 URL: https://issues.apache.org/jira/browse/FLUME-3364
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.9.0
>         Environment: in mac system.  The backoff method will block the thread with the data has been read. 
>            Reporter: wang qun
>            Priority: Critical
>         Attachments: image-2020-04-26-21-43-52-084.png, image-2020-04-26-21-44-01-785.png
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The backoff method in taildir source is invalid because of a code bug.
> This part should not set default status as false
> {code:java}
> //代码占位符
> public Status process() {
>   Status status = Status.BACKOFF;
>   try {
>     existingInodes.clear();
>     existingInodes.addAll(reader.updateTailFiles());
>     for (long inode : existingInodes) {
>       TailFile tf = reader.getTailFiles().get(inode);
>       if (tf.needTail()) {
>         boolean hasMoreLines = tailFileProcess(tf, true);
>         if (hasMoreLines) {
>           status = Status.READY;
>         }
>       }
>     }
>     closeTailFiles();
>   } catch (Throwable t) {
>     logger.error("Unable to tail files", t);
>     sourceCounter.incrementEventReadFail();
>     status = Status.BACKOFF;
>   }
>   return status;
> }
> {code}
>  
> {code:java}
> //代码占位符
> private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>     throws IOException, InterruptedException {
>   long batchCount = 0;
>   while (true) {
>     reader.setCurrentFile(tf);
>     List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
>     if (events.isEmpty()) {
>       return false;
>     }
>     sourceCounter.addToEventReceivedCount(events.size());
>     sourceCounter.incrementAppendBatchReceivedCount();
>     try {
>       getChannelProcessor().processEventBatch(events);
>       reader.commit();
>     } catch (ChannelException ex) {
>       logger.warn("The channel is full or unexpected failure. " +
>           "The source will try again after " + retryInterval + " ms");
>       sourceCounter.incrementChannelWriteFail();
>       TimeUnit.MILLISECONDS.sleep(retryInterval);
>       retryInterval = retryInterval << 1;
>       retryInterval = Math.min(retryInterval, maxRetryInterval);
>       continue;
>     }
>     retryInterval = 1000;
>     sourceCounter.addToEventAcceptedCount(events.size());
>     sourceCounter.incrementAppendBatchAcceptedCount();
>     if (events.size() < batchSize) {
>       logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize);
>       return false;
>     }
>     if (++batchCount >= maxBatchCount) {
>       logger.debug("The batches read from the same file is larger than " + maxBatchCount );
>       return true;
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org