You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flume.apache.org by "chenhongcai (Jira)" <ji...@apache.org> on 2019/12/28 09:55:00 UTC
[jira] [Updated] (FLUME-3349) TailDir source Data loss
[ https://issues.apache.org/jira/browse/FLUME-3349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chenhongcai updated FLUME-3349:
-------------------------------
Description:
如果skipToEnd为true,标红的逻辑在处理当前扫描到的这批文件
时生成新的文件时,到下次扫描到这个新文件是从文件的末尾开始读取,导致数据丢失
{code:java}
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
updateTime = System.currentTimeMillis();
List<Long> updatedInodes = Lists.newArrayList();
for (TaildirMatcher taildir : taildirCache) {
Map<String, String> headers = headerTable.row(taildir.getFileGroup());
for (File f : taildir.getMatchingFiles()) {
long inode;
try {
inode = getInode(f);
} catch (NoSuchFileException e) {
logger.info("File has been deleted in the meantime: " + e.getMessage());
continue;
}
TailFile tf = tailFiles.get(inode);
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
{color:red} long startPos = skipToEnd ? f.length() : 0;{color}
tf = openFile(f, headers, inode, startPos);
} else {
boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
if (updated) {
if (tf.getRaf() == null) {
tf = openFile(f, headers, inode, tf.getPos());
}
if (f.length() < tf.getPos()) {
logger.info("Pos " + tf.getPos() + " is larger than file size! "
+ "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
tf.updatePos(tf.getPath(), inode, 0);
}
}
tf.setNeedTail(updated);
}
tailFiles.put(inode, tf);
updatedInodes.add(inode);
}
}
return updatedInodes;
}
{code}
was:
如果skipToEnd为true,标红的逻辑在处理当前扫描到的这批文件
时生成新的文件时,到下次扫描到这个新文件是从文件的末尾开始读取,导致数据丢失
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
updateTime = System.currentTimeMillis();
List<Long> updatedInodes = Lists.newArrayList();
for (TaildirMatcher taildir : taildirCache) {
Map<String, String> headers = headerTable.row(taildir.getFileGroup());
for (File f : taildir.getMatchingFiles()) {
long inode;
try {
inode = getInode(f);
} catch (NoSuchFileException e) {
logger.info("File has been deleted in the meantime: " + e.getMessage());
continue;
}
TailFile tf = tailFiles.get(inode);
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
{color:red} long startPos = skipToEnd ? f.length() : 0;{color}
tf = openFile(f, headers, inode, startPos);
} else {
boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
if (updated) {
if (tf.getRaf() == null) {
tf = openFile(f, headers, inode, tf.getPos());
}
if (f.length() < tf.getPos()) {
logger.info("Pos " + tf.getPos() + " is larger than file size! "
+ "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
tf.updatePos(tf.getPath(), inode, 0);
}
}
tf.setNeedTail(updated);
}
tailFiles.put(inode, tf);
updatedInodes.add(inode);
}
}
return updatedInodes;
}
> TailDir source Data loss
> ------------------------
>
> Key: FLUME-3349
> URL: https://issues.apache.org/jira/browse/FLUME-3349
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.8.0, 1.9.0
> Environment: apache-flume-bin-1.8.0
> java-1.8.92
> Reporter: chenhongcai
> Priority: Critical
>
> 如果skipToEnd为true,标红的逻辑在处理当前扫描到的这批文件
> 时生成新的文件时,到下次扫描到这个新文件是从文件的末尾开始读取,导致数据丢失
> {code:java}
> public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
> updateTime = System.currentTimeMillis();
> List<Long> updatedInodes = Lists.newArrayList();
> for (TaildirMatcher taildir : taildirCache) {
> Map<String, String> headers = headerTable.row(taildir.getFileGroup());
> for (File f : taildir.getMatchingFiles()) {
> long inode;
> try {
> inode = getInode(f);
> } catch (NoSuchFileException e) {
> logger.info("File has been deleted in the meantime: " + e.getMessage());
> continue;
> }
> TailFile tf = tailFiles.get(inode);
> if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
> {color:red} long startPos = skipToEnd ? f.length() : 0;{color}
> tf = openFile(f, headers, inode, startPos);
> } else {
> boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
> if (updated) {
> if (tf.getRaf() == null) {
> tf = openFile(f, headers, inode, tf.getPos());
> }
> if (f.length() < tf.getPos()) {
> logger.info("Pos " + tf.getPos() + " is larger than file size! "
> + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
> tf.updatePos(tf.getPath(), inode, 0);
> }
> }
> tf.setNeedTail(updated);
> }
> tailFiles.put(inode, tf);
> updatedInodes.add(inode);
> }
> }
> return updatedInodes;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org