You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flume.apache.org by "wang qun (Jira)" <ji...@apache.org> on 2020/05/07 05:00:00 UTC
[jira] [Updated] (FLUME-3364) TailDir BackOff Method always blocks
the thread
[ https://issues.apache.org/jira/browse/FLUME-3364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
wang qun updated FLUME-3364:
----------------------------
Priority: Critical (was: Major)
> TailDir BackOff Method always blocks the thread
> -----------------------------------------------
>
> Key: FLUME-3364
> URL: https://issues.apache.org/jira/browse/FLUME-3364
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.9.0
> Environment: in mac system. The backoff method will block the thread with the data has been read.
> Reporter: wang qun
> Priority: Critical
> Attachments: image-2020-04-26-21-43-52-084.png, image-2020-04-26-21-44-01-785.png
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> The backoff method in taildir source is invalid because of a code bug.
> This part should not set default status as false
> {code:java}
> //代码占位符
> public Status process() {
> Status status = Status.BACKOFF;
> try {
> existingInodes.clear();
> existingInodes.addAll(reader.updateTailFiles());
> for (long inode : existingInodes) {
> TailFile tf = reader.getTailFiles().get(inode);
> if (tf.needTail()) {
> boolean hasMoreLines = tailFileProcess(tf, true);
> if (hasMoreLines) {
> status = Status.READY;
> }
> }
> }
> closeTailFiles();
> } catch (Throwable t) {
> logger.error("Unable to tail files", t);
> sourceCounter.incrementEventReadFail();
> status = Status.BACKOFF;
> }
> return status;
> }
> {code}
>
> {code:java}
> //代码占位符
> private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL)
> throws IOException, InterruptedException {
> long batchCount = 0;
> while (true) {
> reader.setCurrentFile(tf);
> List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
> if (events.isEmpty()) {
> return false;
> }
> sourceCounter.addToEventReceivedCount(events.size());
> sourceCounter.incrementAppendBatchReceivedCount();
> try {
> getChannelProcessor().processEventBatch(events);
> reader.commit();
> } catch (ChannelException ex) {
> logger.warn("The channel is full or unexpected failure. " +
> "The source will try again after " + retryInterval + " ms");
> sourceCounter.incrementChannelWriteFail();
> TimeUnit.MILLISECONDS.sleep(retryInterval);
> retryInterval = retryInterval << 1;
> retryInterval = Math.min(retryInterval, maxRetryInterval);
> continue;
> }
> retryInterval = 1000;
> sourceCounter.addToEventAcceptedCount(events.size());
> sourceCounter.incrementAppendBatchAcceptedCount();
> if (events.size() < batchSize) {
> logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize);
> return false;
> }
> if (++batchCount >= maxBatchCount) {
> logger.debug("The batches read from the same file is larger than " + maxBatchCount );
> return true;
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org