You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "刘天昊 (JIRA)" <ji...@apache.org> on 2016/11/25 02:48:58 UTC

[jira] [Created] (FLUME-3032) taildir source sleeps frequently.

刘天昊 created FLUME-3032:
--------------------------

             Summary: taildir source sleeps frequently.
                 Key: FLUME-3032
                 URL: https://issues.apache.org/jira/browse/FLUME-3032
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.7.0
         Environment: CentOS Linux release 7.2.1511 (Core) 
java version "1.7.0_80"
            Reporter: 刘天昊


Test configuration.
source - taildir
interceptor -  The custom interceptor drops some events
channel - anyone
sink - none

I found that taildir source sleeps frequently.

The tailFileProcess() function in TaildirSource.java break the loop by (events.size() < batchSize), but interceptor may change events.size().

I think the events.size() should be used before interceptor processing. 
Avoid unnecessary sleep.

{code:title=TaildirSource.java|borderStyle=solid}
    private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
            throws IOException, InterruptedException {
        long receivedSize = 0;
        while (true) {
            reader.setCurrentFile(tf);
            List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
            if (events.isEmpty()) {
                break;
            }
            receivedSize = events.size();
            sourceCounter.addToEventReceivedCount(receivedSize);
            sourceCounter.incrementAppendBatchReceivedCount();
            try {
                getChannelProcessor().processEventBatch(events);
                reader.commit();
            } catch (ChannelException ex) {
                logger.warn("The channel is full or unexpected failure. "
                        + "The source will try again after " + retryInterval
                        + " ms");
                TimeUnit.MILLISECONDS.sleep(retryInterval);
                retryInterval = retryInterval << 1;
                retryInterval = Math.min(retryInterval, maxRetryInterval);
                continue;
            }
            retryInterval = 1000;
            sourceCounter.addToEventAcceptedCount(events.size());
            sourceCounter.incrementAppendBatchAcceptedCount();
            if (receivedSize < batchSize) {
                break;
            }
        }
    }
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)