You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Jun Seok Hong (JIRA)" <ji...@apache.org> on 2015/10/14 08:27:05 UTC

[jira] [Created] (FLUME-2811) Taildir source doesn't call stop() on graceful shutdown

Jun Seok Hong created FLUME-2811:
------------------------------------

             Summary: Taildir source doesn't call stop() on graceful shutdown
                 Key: FLUME-2811
                 URL: https://issues.apache.org/jira/browse/FLUME-2811
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.7.0
            Reporter: Jun Seok Hong
            Priority: Critical


Taildir source doesn't call stop() on graceful shutdown.

Test configuration.
source - taildir
channel - PseudoTxnMemoryChannel / flume-kafka-channel
sink - none

I found that flume sometimes doesn't terminate with Taildir source. 
I had to kill the process to terminate it.

tailFileProcess() function in TaildirSource.java has a infinite loop.

When the process interrupted, ChannelException will happen, but it can't breaks the infinite loop.
I think that's the reason why Taildir can't call stop() function.

{code:title=TaildirSource.java|borderStyle=solid}
 private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
      throws IOException, InterruptedException {
    while (true) {
      reader.setCurrentFile(tf);
      List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
      if (events.isEmpty()) {
        break;
      }
      sourceCounter.addToEventReceivedCount(events.size());
      sourceCounter.incrementAppendBatchReceivedCount();
      try {
        getChannelProcessor().processEventBatch(events);
        reader.commit();
      } catch (ChannelException ex) {
        logger.warn("The channel is full or unexpected failure. " +
          "The source will try again after " + retryInterval + " ms");
        TimeUnit.MILLISECONDS.sleep(retryInterval);
        retryInterval = retryInterval << 1;
        retryInterval = Math.min(retryInterval, maxRetryInterval);
        continue;
      }
      retryInterval = 1000;
      sourceCounter.addToEventAcceptedCount(events.size());
      sourceCounter.incrementAppendBatchAcceptedCount();
      if (events.size() < batchSize) {
        break;
      }
    }
  }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)