You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/07/19 08:13:12 UTC

[incubator-iotdb] 01/01: [ISSUE-1503] Fix BufferUnderflowException when querying TsFile stored in HDFS

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch issue_1503
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 04d08f3e829b6fdaa8970a0de5608d95dbdb21d5
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jul 19 16:12:31 2020 +0800

    [ISSUE-1503] Fix BufferUnderflowException when querying TsFile stored in HDFS
---
 .../org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java |  3 +--
 .../db/engine/storagegroup/StorageGroupProcessor.java  | 17 ++++++++---------
 .../fileOutputFactory/HDFSOutputFactory.java           |  3 +--
 .../iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java | 12 ++++++++----
 .../apache/iotdb/tsfile/read/TsFileSequenceReader.java |  8 +++++---
 .../tsfile/write/writer/ForceAppendTsFileWriter.java   |  3 ++-
 .../tsfile/write/writer/RestorableTsFileIOWriter.java  |  1 +
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java      | 18 +++++++++---------
 8 files changed, 35 insertions(+), 30 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index 2e86fda..8693eb2 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -72,7 +72,6 @@ public class HDFSOutput implements TsFileOutput {
 
   @Override
   public void close() throws IOException {
-    flush();
     fsDataOutputStream.close();
   }
 
@@ -83,7 +82,7 @@ public class HDFSOutput implements TsFileOutput {
 
   @Override
   public void flush() throws IOException {
-    this.fsDataOutputStream.flush();
+    this.fsDataOutputStream.hflush();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d0940f9..77b81cc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -539,13 +539,12 @@ public class StorageGroupProcessor {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to in
+        // the last file is not closed, continue writing to it
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::updateLatestFlushTimeCallback, true, writer);
-        workSequenceTsFileProcessors
-            .put(timePartitionId, tsFileProcessor);
+        workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileResource.removeResourceFile();
         tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -574,7 +573,7 @@ public class StorageGroupProcessor {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to in
+        // the last file is not closed, continue writing to it
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
@@ -1884,7 +1883,7 @@ public class StorageGroupProcessor {
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
-            newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+            newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
           }
         }
         loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
@@ -2183,7 +2182,7 @@ public class StorageGroupProcessor {
     File targetFile;
     switch (type) {
       case LOAD_UNSEQUENCE:
-        targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+        targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
             storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
@@ -2197,7 +2196,7 @@ public class StorageGroupProcessor {
         break;
       case LOAD_SEQUENCE:
         targetFile =
-            new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+            fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
                 storageGroupName + File.separatorChar + filePartitionId + File.separator
                     + tsFileResource.getFile().getName());
         tsFileResource.setFile(targetFile);
@@ -2228,9 +2227,9 @@ public class StorageGroupProcessor {
           syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
     }
 
-    File syncedResourceFile = new File(
+    File syncedResourceFile = fsFactory.getFile(
         syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
-    File targetResourceFile = new File(
+    File targetResourceFile = fsFactory.getFile(
         targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     try {
       FileUtils.moveFile(syncedResourceFile, targetResourceFile);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
index 7699c08..8ebf18f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
@@ -50,8 +50,7 @@ public class HDFSOutputFactory implements FileOutputFactory {
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
           "Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
-          filePath,
-          e);
+          filePath, e);
       return null;
     }
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index 0e80f88..03a71a9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -28,7 +28,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +44,7 @@ public class HDFSFactory implements FSFactory {
   private static Method getBufferedOutputStream;
   private static Method listFilesBySuffix;
   private static Method listFilesByPrefix;
+  private static Method renameTo;
 
   static {
     try {
@@ -59,6 +59,7 @@ public class HDFSFactory implements FSFactory {
       getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream", String.class);
       listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
       listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
+      renameTo = clazz.getMethod("renameTo", File.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger
           .error("Failed to get Hadoop file system. Please check your dependency of Hadoop module.",
@@ -173,9 +174,12 @@ public class HDFSFactory implements FSFactory {
   }
 
   public void moveFile(File srcFile, File destFile) {
-    boolean rename = srcFile.renameTo(destFile);
-    if (!rename) {
-      logger.error("Failed to rename file from {} to {}. ", srcFile.getName(), destFile.getName());
+    try {
+      renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+      logger.error(
+          "Failed to rename file from {} to {}. Please check your dependency of Hadoop module.",
+          srcFile.getName(), destFile.getName());
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 774cf8a..0cf8972 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -33,7 +33,6 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -768,8 +767,11 @@ public class TsFileSequenceReader implements AutoCloseable {
         throw new IOException("reach the end of the data");
       }
     } else {
-      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
-        throw new IOException("reach the end of the data");
+      long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size);
+      if (actualReadSize != size) {
+        throw new IOException(
+            String.format("reach the end of the data. Size of data that want to read: %s,"
+                + "actual read size: %s, posiotion: %s", size, actualReadSize, position));
       }
     }
     buffer.flip();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index d677e38..9dfccf8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
     if (logger.isDebugEnabled()) {
       logger.debug("{} writer is opened.", file.getName());
     }
-    this.out = new LocalTsFileOutput(file, true);
+    this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
     this.file = file;
 
     // file doesn't exist
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 8a12b05..52c1ba5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -100,6 +100,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
           out.truncate(
               (long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
                   .getBytes().length);
+          logger.error("after truncate, file.length: " + file.length());
         } else {
           crashed = true;
           // remove broken data
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index edbf006..2d4b853 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -105,6 +105,7 @@ public class TsFileIOWriter {
    * @throws IOException if I/O error occurs
    */
   public TsFileIOWriter(File file) throws IOException {
+    logger.error("EEEEERRRRROOOOORRRRRR, append: true");
     this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
     this.file = file;
     if (resourceLogger.isDebugEnabled()) {
@@ -142,9 +143,7 @@ public class TsFileIOWriter {
   public void startChunkGroup(String deviceId) throws IOException {
     this.currentChunkGroupDeviceId = deviceId;
     currentChunkGroupStartOffset = out.getPosition();
-    if (logger.isDebugEnabled()) {
-      logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
-    }
+    logger.error("start chunk group:{}, file position {}", deviceId, out.getPosition());
     chunkMetadataList = new ArrayList<>();
   }
 
@@ -156,6 +155,9 @@ public class TsFileIOWriter {
       return;
     }
     long dataSize = out.getPosition() - currentChunkGroupStartOffset;
+    logger.error("position:{}, currentChunkGroupStartOffset: {}, dataSize: {}",
+        out.getPosition(), currentChunkGroupStartOffset, dataSize);
+
     ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupDeviceId, dataSize,
         chunkMetadataList.size());
     chunkGroupFooter.serializeTo(out.wrapAsStream());
@@ -163,6 +165,7 @@ public class TsFileIOWriter {
         .add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
     currentChunkGroupDeviceId = null;
     chunkMetadataList = null;
+    out.flush();
   }
 
   /**
@@ -176,14 +179,11 @@ public class TsFileIOWriter {
    * @throws IOException if I/O error occurs
    */
   public void startFlushChunk(MeasurementSchema measurementSchema,
-      CompressionType compressionCodecName,
-      TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, int dataSize,
-      int numOfPages)
-      throws IOException {
+      CompressionType compressionCodecName, TSDataType tsDataType, TSEncoding encodingType,
+      Statistics<?> statistics, int dataSize, int numOfPages) throws IOException {
 
     currentChunkMetadata = new ChunkMetadata(measurementSchema.getMeasurementId(), tsDataType,
-        out.getPosition(),
-        statistics);
+        out.getPosition(), statistics);
 
     ChunkHeader header = new ChunkHeader(measurementSchema.getMeasurementId(), dataSize, tsDataType,
         compressionCodecName, encodingType, numOfPages);