You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "liu xiaofei (JIRA)" <ji...@apache.org> on 2015/07/08 10:59:04 UTC

[jira] [Created] (FLUME-2741) hdfs sink could not close file,should remove writer from sfWriters

liu xiaofei created FLUME-2741:
----------------------------------

             Summary: hdfs sink could not close file,should remove writer from sfWriters
                 Key: FLUME-2741
                 URL: https://issues.apache.org/jira/browse/FLUME-2741
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.6.0, v1.5.2
            Reporter: liu xiaofei


while flume hdfs sink write data to hdfs ,the hdfs cluster come unkown error and namenode could not offer a healthy node, the BucketWriter will not close。
timedRollerPool will Interrupt  because of close(true) has error!  Code is:


if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
              bucketPath, rollInterval);
          try {
            // Roll the file and remove reference from sfWriters map.
            close(true);
          } catch(Throwable t) {
            LOG.error("Unexpected error", t);
          }
          return null;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
          TimeUnit.SECONDS);
    }

And in my hdfs sink conf ,hdfs.rollSize=0,hdfs.rollCount=0,hdfs.rollInterval=1200,and directory like YYYY/MM/DD,  so only depend on rollInterval time to roll hdfs file, my channel is memory channel, if colse fail, HDFSEventSink‘s sfWriters will not remove the error file, so hdfs sink will not work ,can't consume data in channel, if channel capacity is small or data more ,
maybe :1、channel will be full come soon ;2、 hdfs sink always process the data correspond the error file,because the data's timestamp is belong to the error file...

so , i think while close file has exception remove the writer in sfWriters like :

if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
              bucketPath, rollInterval);
          try {
            // Roll the file and remove reference from sfWriters map.
            close(true);
          } catch(Throwable t) {
            LOG.error("Unexpected error", t);
            runCloseAction();
            closed = true;
          }
          return null;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
          TimeUnit.SECONDS);
    }


OK ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)