You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/07/19 08:25:05 UTC

[incubator-iotdb] branch jira_805 created (now 700c651)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch jira_805
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 700c651  [IOTDB-805] Fix BufferUnderflowException when querying TsFile stored in HDFS

This branch includes the following new commits:

     new 700c651  [IOTDB-805] Fix BufferUnderflowException when querying TsFile stored in HDFS

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: [IOTDB-805] Fix BufferUnderflowException when querying TsFile stored in HDFS

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira_805
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 700c65160d6ec147b35a2daf0dc2931dd69f138b
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jul 19 16:24:38 2020 +0800

    [IOTDB-805] Fix BufferUnderflowException when querying TsFile stored in HDFS
---
 .../org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java  |  3 +--
 .../db/engine/storagegroup/StorageGroupProcessor.java   | 17 ++++++++---------
 .../fileSystem/fileOutputFactory/HDFSOutputFactory.java |  3 +--
 .../iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java  | 12 ++++++++----
 .../apache/iotdb/tsfile/read/TsFileSequenceReader.java  |  8 +++++---
 .../tsfile/write/writer/ForceAppendTsFileWriter.java    |  3 ++-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java       | 10 ++++------
 7 files changed, 29 insertions(+), 27 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index 2e86fda..8693eb2 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -72,7 +72,6 @@ public class HDFSOutput implements TsFileOutput {
 
   @Override
   public void close() throws IOException {
-    flush();
     fsDataOutputStream.close();
   }
 
@@ -83,7 +82,7 @@ public class HDFSOutput implements TsFileOutput {
 
   @Override
   public void flush() throws IOException {
-    this.fsDataOutputStream.flush();
+    this.fsDataOutputStream.hflush();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index eef78cf..f1818ec 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -531,13 +531,12 @@ public class StorageGroupProcessor {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to in
+        // the last file is not closed, continue writing to it
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::updateLatestFlushTimeCallback, true, writer);
-        workSequenceTsFileProcessors
-            .put(timePartitionId, tsFileProcessor);
+        workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileResource.removeResourceFile();
         tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -566,7 +565,7 @@ public class StorageGroupProcessor {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to in
+        // the last file is not closed, continue writing to it
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
@@ -1773,7 +1772,7 @@ public class StorageGroupProcessor {
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
-            newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
+            newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
           }
         }
         loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
@@ -2028,7 +2027,7 @@ public class StorageGroupProcessor {
     File targetFile;
     switch (type) {
       case LOAD_UNSEQUENCE:
-        targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+        targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
             storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
@@ -2042,7 +2041,7 @@ public class StorageGroupProcessor {
         break;
       case LOAD_SEQUENCE:
         targetFile =
-            new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+            fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
                 storageGroupName + File.separatorChar + filePartitionId + File.separator
                     + tsFileResource.getFile().getName());
         tsFileResource.setFile(targetFile);
@@ -2073,9 +2072,9 @@ public class StorageGroupProcessor {
           syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
     }
 
-    File syncedResourceFile = new File(
+    File syncedResourceFile = fsFactory.getFile(
         syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
-    File targetResourceFile = new File(
+    File targetResourceFile = fsFactory.getFile(
         targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     try {
       FileUtils.moveFile(syncedResourceFile, targetResourceFile);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
index 7699c08..8ebf18f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
@@ -50,8 +50,7 @@ public class HDFSOutputFactory implements FileOutputFactory {
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
           "Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
-          filePath,
-          e);
+          filePath, e);
       return null;
     }
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index 0e80f88..03a71a9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -28,7 +28,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +44,7 @@ public class HDFSFactory implements FSFactory {
   private static Method getBufferedOutputStream;
   private static Method listFilesBySuffix;
   private static Method listFilesByPrefix;
+  private static Method renameTo;
 
   static {
     try {
@@ -59,6 +59,7 @@ public class HDFSFactory implements FSFactory {
       getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream", String.class);
       listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
       listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
+      renameTo = clazz.getMethod("renameTo", File.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger
           .error("Failed to get Hadoop file system. Please check your dependency of Hadoop module.",
@@ -173,9 +174,12 @@ public class HDFSFactory implements FSFactory {
   }
 
   public void moveFile(File srcFile, File destFile) {
-    boolean rename = srcFile.renameTo(destFile);
-    if (!rename) {
-      logger.error("Failed to rename file from {} to {}. ", srcFile.getName(), destFile.getName());
+    try {
+      renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+      logger.error(
+          "Failed to rename file from {} to {}. Please check your dependency of Hadoop module.",
+          srcFile.getName(), destFile.getName());
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8312d3e..48452ac 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -33,7 +33,6 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -768,8 +767,11 @@ public class TsFileSequenceReader implements AutoCloseable {
         throw new IOException("reach the end of the data");
       }
     } else {
-      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
-        throw new IOException("reach the end of the data");
+      long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size);
+      if (actualReadSize != size) {
+        throw new IOException(
+            String.format("reach the end of the data. Size of data that want to read: %s,"
+                + "actual read size: %s, posiotion: %s", size, actualReadSize, position));
       }
     }
     buffer.flip();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index d677e38..9dfccf8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
     if (logger.isDebugEnabled()) {
       logger.debug("{} writer is opened.", file.getName());
     }
-    this.out = new LocalTsFileOutput(file, true);
+    this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
     this.file = file;
 
     // file doesn't exist
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 31e1146..c6b1726 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -163,6 +163,7 @@ public class TsFileIOWriter {
         .add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
     currentChunkGroupDeviceId = null;
     chunkMetadataList = null;
+    out.flush();
   }
 
   /**
@@ -176,14 +177,11 @@ public class TsFileIOWriter {
    * @throws IOException if I/O error occurs
    */
   public void startFlushChunk(MeasurementSchema measurementSchema,
-      CompressionType compressionCodecName,
-      TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, int dataSize,
-      int numOfPages)
-      throws IOException {
+      CompressionType compressionCodecName, TSDataType tsDataType, TSEncoding encodingType,
+      Statistics<?> statistics, int dataSize, int numOfPages) throws IOException {
 
     currentChunkMetadata = new ChunkMetadata(measurementSchema.getMeasurementId(), tsDataType,
-        out.getPosition(),
-        statistics);
+        out.getPosition(), statistics);
 
     ChunkHeader header = new ChunkHeader(measurementSchema.getMeasurementId(), dataSize, tsDataType,
         compressionCodecName, encodingType, numOfPages);