You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/07/19 08:13:11 UTC

[incubator-iotdb] branch issue_1503 created (now 04d08f3)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch issue_1503
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 04d08f3  [ISSUE-1503] Fix BufferUnderflowException when querying TsFile stored in HDFS

This branch includes the following new commits:

     new 04d08f3  [ISSUE-1503] Fix BufferUnderflowException when querying TsFile stored in HDFS

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: [ISSUE-1503] Fix BufferUnderflowException when querying TsFile stored in HDFS

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch issue_1503
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 04d08f3e829b6fdaa8970a0de5608d95dbdb21d5
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jul 19 16:12:31 2020 +0800

    [ISSUE-1503] Fix BufferUnderflowException when querying TsFile stored in HDFS
---
 .../org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java |  3 +--
 .../db/engine/storagegroup/StorageGroupProcessor.java  | 17 ++++++++---------
 .../fileOutputFactory/HDFSOutputFactory.java           |  3 +--
 .../iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java | 12 ++++++++----
 .../apache/iotdb/tsfile/read/TsFileSequenceReader.java |  8 +++++---
 .../tsfile/write/writer/ForceAppendTsFileWriter.java   |  3 ++-
 .../tsfile/write/writer/RestorableTsFileIOWriter.java  |  1 +
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java      | 18 +++++++++---------
 8 files changed, 35 insertions(+), 30 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index 2e86fda..8693eb2 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -72,7 +72,6 @@ public class HDFSOutput implements TsFileOutput {
 
   @Override
   public void close() throws IOException {
-    flush();
     fsDataOutputStream.close();
   }
 
@@ -83,7 +82,7 @@ public class HDFSOutput implements TsFileOutput {
 
   @Override
   public void flush() throws IOException {
-    this.fsDataOutputStream.flush();
+    this.fsDataOutputStream.hflush();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d0940f9..77b81cc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -539,13 +539,12 @@ public class StorageGroupProcessor {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to in
+        // the last file is not closed, continue writing to it
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::updateLatestFlushTimeCallback, true, writer);
-        workSequenceTsFileProcessors
-            .put(timePartitionId, tsFileProcessor);
+        workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileResource.removeResourceFile();
         tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -574,7 +573,7 @@ public class StorageGroupProcessor {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to in
+        // the last file is not closed, continue writing to it
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
@@ -1884,7 +1883,7 @@ public class StorageGroupProcessor {
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
-            newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+            newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
           }
         }
         loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
@@ -2183,7 +2182,7 @@ public class StorageGroupProcessor {
     File targetFile;
     switch (type) {
       case LOAD_UNSEQUENCE:
-        targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+        targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
             storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
@@ -2197,7 +2196,7 @@ public class StorageGroupProcessor {
         break;
       case LOAD_SEQUENCE:
         targetFile =
-            new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+            fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
                 storageGroupName + File.separatorChar + filePartitionId + File.separator
                     + tsFileResource.getFile().getName());
         tsFileResource.setFile(targetFile);
@@ -2228,9 +2227,9 @@ public class StorageGroupProcessor {
           syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
     }
 
-    File syncedResourceFile = new File(
+    File syncedResourceFile = fsFactory.getFile(
         syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
-    File targetResourceFile = new File(
+    File targetResourceFile = fsFactory.getFile(
         targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     try {
       FileUtils.moveFile(syncedResourceFile, targetResourceFile);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
index 7699c08..8ebf18f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
@@ -50,8 +50,7 @@ public class HDFSOutputFactory implements FileOutputFactory {
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
           "Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
-          filePath,
-          e);
+          filePath, e);
       return null;
     }
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index 0e80f88..03a71a9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -28,7 +28,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +44,7 @@ public class HDFSFactory implements FSFactory {
   private static Method getBufferedOutputStream;
   private static Method listFilesBySuffix;
   private static Method listFilesByPrefix;
+  private static Method renameTo;
 
   static {
     try {
@@ -59,6 +59,7 @@ public class HDFSFactory implements FSFactory {
       getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream", String.class);
       listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
       listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
+      renameTo = clazz.getMethod("renameTo", File.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger
           .error("Failed to get Hadoop file system. Please check your dependency of Hadoop module.",
@@ -173,9 +174,12 @@ public class HDFSFactory implements FSFactory {
   }
 
   public void moveFile(File srcFile, File destFile) {
-    boolean rename = srcFile.renameTo(destFile);
-    if (!rename) {
-      logger.error("Failed to rename file from {} to {}. ", srcFile.getName(), destFile.getName());
+    try {
+      renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+      logger.error(
+          "Failed to rename file from {} to {}. Please check your dependency of Hadoop module.",
+          srcFile.getName(), destFile.getName());
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 774cf8a..0cf8972 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -33,7 +33,6 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -768,8 +767,11 @@ public class TsFileSequenceReader implements AutoCloseable {
         throw new IOException("reach the end of the data");
       }
     } else {
-      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
-        throw new IOException("reach the end of the data");
+      long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size);
+      if (actualReadSize != size) {
+        throw new IOException(
+            String.format("reach the end of the data. Size of data that want to read: %s,"
+                + "actual read size: %s, posiotion: %s", size, actualReadSize, position));
       }
     }
     buffer.flip();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index d677e38..9dfccf8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
     if (logger.isDebugEnabled()) {
       logger.debug("{} writer is opened.", file.getName());
     }
-    this.out = new LocalTsFileOutput(file, true);
+    this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
     this.file = file;
 
     // file doesn't exist
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 8a12b05..52c1ba5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -100,6 +100,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
           out.truncate(
               (long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
                   .getBytes().length);
+          logger.error("after truncate, file.length: " + file.length());
         } else {
           crashed = true;
           // remove broken data
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index edbf006..2d4b853 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -105,6 +105,7 @@ public class TsFileIOWriter {
    * @throws IOException if I/O error occurs
    */
   public TsFileIOWriter(File file) throws IOException {
+    logger.error("EEEEERRRRROOOOORRRRRR, append: true");
     this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
     this.file = file;
     if (resourceLogger.isDebugEnabled()) {
@@ -142,9 +143,7 @@ public class TsFileIOWriter {
   public void startChunkGroup(String deviceId) throws IOException {
     this.currentChunkGroupDeviceId = deviceId;
     currentChunkGroupStartOffset = out.getPosition();
-    if (logger.isDebugEnabled()) {
-      logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
-    }
+    logger.error("start chunk group:{}, file position {}", deviceId, out.getPosition());
     chunkMetadataList = new ArrayList<>();
   }
 
@@ -156,6 +155,9 @@ public class TsFileIOWriter {
       return;
     }
     long dataSize = out.getPosition() - currentChunkGroupStartOffset;
+    logger.error("position:{}, currentChunkGroupStartOffset: {}, dataSize: {}",
+        out.getPosition(), currentChunkGroupStartOffset, dataSize);
+
     ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupDeviceId, dataSize,
         chunkMetadataList.size());
     chunkGroupFooter.serializeTo(out.wrapAsStream());
@@ -163,6 +165,7 @@ public class TsFileIOWriter {
         .add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
     currentChunkGroupDeviceId = null;
     chunkMetadataList = null;
+    out.flush();
   }
 
   /**
@@ -176,14 +179,11 @@ public class TsFileIOWriter {
    * @throws IOException if I/O error occurs
    */
   public void startFlushChunk(MeasurementSchema measurementSchema,
-      CompressionType compressionCodecName,
-      TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, int dataSize,
-      int numOfPages)
-      throws IOException {
+      CompressionType compressionCodecName, TSDataType tsDataType, TSEncoding encodingType,
+      Statistics<?> statistics, int dataSize, int numOfPages) throws IOException {
 
     currentChunkMetadata = new ChunkMetadata(measurementSchema.getMeasurementId(), tsDataType,
-        out.getPosition(),
-        statistics);
+        out.getPosition(), statistics);
 
     ChunkHeader header = new ChunkHeader(measurementSchema.getMeasurementId(), dataSize, tsDataType,
         compressionCodecName, encodingType, numOfPages);