You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flume.apache.org by "chenhongcai (Jira)" <ji...@apache.org> on 2019/12/28 09:55:00 UTC

[jira] [Updated] (FLUME-3349) TailDir source Data loss

     [ https://issues.apache.org/jira/browse/FLUME-3349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

chenhongcai updated FLUME-3349:
-------------------------------
    Description: 
如果skipToEnd为true,标红的逻辑在处理当前扫描到的这批文件
时生成新的文件时,到下次扫描到这个新文件是从文件的末尾开始读取,导致数据丢失


{code:java}
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
    updateTime = System.currentTimeMillis();
    List<Long> updatedInodes = Lists.newArrayList();

    for (TaildirMatcher taildir : taildirCache) {
      Map<String, String> headers = headerTable.row(taildir.getFileGroup());

      for (File f : taildir.getMatchingFiles()) {
        long inode;
        try {
          inode = getInode(f);
        } catch (NoSuchFileException e) {
          logger.info("File has been deleted in the meantime: " + e.getMessage());
          continue;
        }
        TailFile tf = tailFiles.get(inode);
        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
{color:red}          long startPos = skipToEnd ? f.length() : 0;{color}
          tf = openFile(f, headers, inode, startPos);
        } else {
          boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
          if (updated) {
            if (tf.getRaf() == null) {
              tf = openFile(f, headers, inode, tf.getPos());
            }
            if (f.length() < tf.getPos()) {
              logger.info("Pos " + tf.getPos() + " is larger than file size! "
                  + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
              tf.updatePos(tf.getPath(), inode, 0);
            }
          }
          tf.setNeedTail(updated);
        }
        tailFiles.put(inode, tf);
        updatedInodes.add(inode);
      }
    }
    return updatedInodes;
  }
{code}


  was:
如果skipToEnd为true,标红的逻辑在处理当前扫描到的这批文件
时生成新的文件时,到下次扫描到这个新文件是从文件的末尾开始读取,导致数据丢失

public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
    updateTime = System.currentTimeMillis();
    List<Long> updatedInodes = Lists.newArrayList();

    for (TaildirMatcher taildir : taildirCache) {
      Map<String, String> headers = headerTable.row(taildir.getFileGroup());

      for (File f : taildir.getMatchingFiles()) {
        long inode;
        try {
          inode = getInode(f);
        } catch (NoSuchFileException e) {
          logger.info("File has been deleted in the meantime: " + e.getMessage());
          continue;
        }
        TailFile tf = tailFiles.get(inode);
        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
{color:red}          long startPos = skipToEnd ? f.length() : 0;{color}
          tf = openFile(f, headers, inode, startPos);
        } else {
          boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
          if (updated) {
            if (tf.getRaf() == null) {
              tf = openFile(f, headers, inode, tf.getPos());
            }
            if (f.length() < tf.getPos()) {
              logger.info("Pos " + tf.getPos() + " is larger than file size! "
                  + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
              tf.updatePos(tf.getPath(), inode, 0);
            }
          }
          tf.setNeedTail(updated);
        }
        tailFiles.put(inode, tf);
        updatedInodes.add(inode);
      }
    }
    return updatedInodes;
  }


> TailDir source Data loss
> ------------------------
>
>                 Key: FLUME-3349
>                 URL: https://issues.apache.org/jira/browse/FLUME-3349
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.8.0, 1.9.0
>         Environment: apache-flume-bin-1.8.0
> java-1.8.92
>            Reporter: chenhongcai
>            Priority: Critical
>
> 如果skipToEnd为true,标红的逻辑在处理当前扫描到的这批文件
> 时生成新的文件时,到下次扫描到这个新文件是从文件的末尾开始读取,导致数据丢失
> {code:java}
> public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
>     updateTime = System.currentTimeMillis();
>     List<Long> updatedInodes = Lists.newArrayList();
>     for (TaildirMatcher taildir : taildirCache) {
>       Map<String, String> headers = headerTable.row(taildir.getFileGroup());
>       for (File f : taildir.getMatchingFiles()) {
>         long inode;
>         try {
>           inode = getInode(f);
>         } catch (NoSuchFileException e) {
>           logger.info("File has been deleted in the meantime: " + e.getMessage());
>           continue;
>         }
>         TailFile tf = tailFiles.get(inode);
>         if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
> {color:red}          long startPos = skipToEnd ? f.length() : 0;{color}
>           tf = openFile(f, headers, inode, startPos);
>         } else {
>           boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
>           if (updated) {
>             if (tf.getRaf() == null) {
>               tf = openFile(f, headers, inode, tf.getPos());
>             }
>             if (f.length() < tf.getPos()) {
>               logger.info("Pos " + tf.getPos() + " is larger than file size! "
>                   + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
>               tf.updatePos(tf.getPath(), inode, 0);
>             }
>           }
>           tf.setNeedTail(updated);
>         }
>         tailFiles.put(inode, tf);
>         updatedInodes.add(inode);
>       }
>     }
>     return updatedInodes;
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org