You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/19 13:33:10 UTC
[incubator-iotdb] branch rel/0.10 updated: [IOTDB-805] Fix
BufferUnderflowException when querying TsFile stored in HDFS
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/rel/0.10 by this push:
new 320cd0d [IOTDB-805] Fix BufferUnderflowException when querying TsFile stored in HDFS
320cd0d is described below
commit 320cd0d7e35bddfe824bb120fa819db10d95bb12
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jul 19 16:24:38 2020 +0800
[IOTDB-805] Fix BufferUnderflowException when querying TsFile stored in HDFS
---
.../org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 3 +--
.../db/engine/storagegroup/StorageGroupProcessor.java | 17 ++++++++---------
.../fileSystem/fileOutputFactory/HDFSOutputFactory.java | 3 +--
.../iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java | 12 ++++++++----
.../apache/iotdb/tsfile/read/TsFileSequenceReader.java | 8 +++++---
.../tsfile/write/writer/ForceAppendTsFileWriter.java | 3 ++-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 10 ++++------
7 files changed, 29 insertions(+), 27 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index 2e86fda..8693eb2 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -72,7 +72,6 @@ public class HDFSOutput implements TsFileOutput {
@Override
public void close() throws IOException {
- flush();
fsDataOutputStream.close();
}
@@ -83,7 +82,7 @@ public class HDFSOutput implements TsFileOutput {
@Override
public void flush() throws IOException {
- this.fsDataOutputStream.flush();
+ this.fsDataOutputStream.hflush();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index eef78cf..f1818ec 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -531,13 +531,12 @@ public class StorageGroupProcessor {
// not the last file or cannot write, just close it
tsFileResource.setClosed(true);
} else if (writer.canWrite()) {
- // the last file is not closed, continue writing to in
+ // the last file is not closed, continue writing to it
TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, writer);
- workSequenceTsFileProcessors
- .put(timePartitionId, tsFileProcessor);
+ workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileResource.removeResourceFile();
tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -566,7 +565,7 @@ public class StorageGroupProcessor {
// not the last file or cannot write, just close it
tsFileResource.setClosed(true);
} else if (writer.canWrite()) {
- // the last file is not closed, continue writing to in
+ // the last file is not closed, continue writing to it
TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
@@ -1773,7 +1772,7 @@ public class StorageGroupProcessor {
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
- newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+ newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
}
}
loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
@@ -2028,7 +2027,7 @@ public class StorageGroupProcessor {
File targetFile;
switch (type) {
case LOAD_UNSEQUENCE:
- targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+ targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
@@ -2042,7 +2041,7 @@ public class StorageGroupProcessor {
break;
case LOAD_SEQUENCE:
targetFile =
- new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+ fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
storageGroupName + File.separatorChar + filePartitionId + File.separator
+ tsFileResource.getFile().getName());
tsFileResource.setFile(targetFile);
@@ -2073,9 +2072,9 @@ public class StorageGroupProcessor {
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
}
- File syncedResourceFile = new File(
+ File syncedResourceFile = fsFactory.getFile(
syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
- File targetResourceFile = new File(
+ File targetResourceFile = fsFactory.getFile(
targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
try {
FileUtils.moveFile(syncedResourceFile, targetResourceFile);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
index 7699c08..8ebf18f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
@@ -50,8 +50,7 @@ public class HDFSOutputFactory implements FileOutputFactory {
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
- filePath,
- e);
+ filePath, e);
return null;
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index 0e80f88..03a71a9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -28,7 +28,6 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +44,7 @@ public class HDFSFactory implements FSFactory {
private static Method getBufferedOutputStream;
private static Method listFilesBySuffix;
private static Method listFilesByPrefix;
+ private static Method renameTo;
static {
try {
@@ -59,6 +59,7 @@ public class HDFSFactory implements FSFactory {
getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream", String.class);
listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
+ renameTo = clazz.getMethod("renameTo", File.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger
.error("Failed to get Hadoop file system. Please check your dependency of Hadoop module.",
@@ -173,9 +174,12 @@ public class HDFSFactory implements FSFactory {
}
public void moveFile(File srcFile, File destFile) {
- boolean rename = srcFile.renameTo(destFile);
- if (!rename) {
- logger.error("Failed to rename file from {} to {}. ", srcFile.getName(), destFile.getName());
+ try {
+ renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to rename file from {} to {}. Please check your dependency of Hadoop module.",
+ srcFile.getName(), destFile.getName());
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8312d3e..48452ac 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -33,7 +33,6 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -768,8 +767,11 @@ public class TsFileSequenceReader implements AutoCloseable {
throw new IOException("reach the end of the data");
}
} else {
- if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
- throw new IOException("reach the end of the data");
+ long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size);
+ if (actualReadSize != size) {
+ throw new IOException(
+ String.format("reach the end of the data. Size of data that want to read: %s,"
+ + "actual read size: %s, posiotion: %s", size, actualReadSize, position));
}
}
buffer.flip();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index d677e38..9dfccf8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
if (logger.isDebugEnabled()) {
logger.debug("{} writer is opened.", file.getName());
}
- this.out = new LocalTsFileOutput(file, true);
+ this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
this.file = file;
// file doesn't exist
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 31e1146..c6b1726 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -163,6 +163,7 @@ public class TsFileIOWriter {
.add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
currentChunkGroupDeviceId = null;
chunkMetadataList = null;
+ out.flush();
}
/**
@@ -176,14 +177,11 @@ public class TsFileIOWriter {
* @throws IOException if I/O error occurs
*/
public void startFlushChunk(MeasurementSchema measurementSchema,
- CompressionType compressionCodecName,
- TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, int dataSize,
- int numOfPages)
- throws IOException {
+ CompressionType compressionCodecName, TSDataType tsDataType, TSEncoding encodingType,
+ Statistics<?> statistics, int dataSize, int numOfPages) throws IOException {
currentChunkMetadata = new ChunkMetadata(measurementSchema.getMeasurementId(), tsDataType,
- out.getPosition(),
- statistics);
+ out.getPosition(), statistics);
ChunkHeader header = new ChunkHeader(measurementSchema.getMeasurementId(), dataSize, tsDataType,
compressionCodecName, encodingType, numOfPages);