You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/06/12 10:24:06 UTC

[incubator-iotdb] branch jira_765 created (now f564ea3)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch jira_765
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at f564ea3  [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage

This branch includes the following new commits:

     new f564ea3  [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira_765
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit f564ea32d94cdd46a6727a4ebe6835d8ca5213e2
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Fri Jun 12 18:23:41 2020 +0800

    [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage
---
 .../apache/iotdb/db/engine/upgrade/UpgradeLog.java |   5 +-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  | 121 ++++++++++-----------
 2 files changed, 61 insertions(+), 65 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
index 2c580c0..1da0acf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
@@ -20,12 +20,12 @@ package org.apache.iotdb.db.engine.upgrade;
 
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,8 +47,7 @@ public class UpgradeLog {
         upgradeLogPath.getParentFile().mkdirs();
       }
       upgradeLogPath.createNewFile();
-      upgradeLogWriter = FSFactoryProducer.getFSFactory()
-          .getBufferedWriter(getUpgradeLogPath(), true);
+      upgradeLogWriter = new BufferedWriter(new FileWriter(getUpgradeLogPath(), true));
       return true;
     } catch (IOException e) {
       logger.error("meet error when create upgrade log, file path:{}",
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 9fc72d2..db973b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -21,13 +21,11 @@ package org.apache.iotdb.db.tools.upgrade;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -47,14 +45,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.v1.file.metadata.ChunkGroupMetaDataV1;
-import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataV1;
 import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataIndexV1;
+import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataV1;
 import org.apache.iotdb.tsfile.v1.file.metadata.TsFileMetadataV1;
 import org.apache.iotdb.tsfile.v1.file.utils.HeaderUtils;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -77,7 +74,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
       TSDataType.INT64);
   private Decoder valueDecoder;
   protected String file;
-  
+
   // PartitionId -> TsFileIOWriter 
   private Map<Long, TsFileIOWriter> partitionWriterMap;
 
@@ -101,8 +98,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
    */
   public TsFileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
     this.file = file;
-    final java.nio.file.Path path = Paths.get(file);
-    tsFileInput = new LocalTsFileInput(path);
+    tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
     partitionWriterMap = new HashMap<>();
     try {
       if (loadMetadataSize) {
@@ -117,11 +113,10 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   /**
    * upgrade a single tsfile
    *
-   * @param tsfileName old version tsFile's absolute path
+   * @param tsFileName old version tsFile's absolute path
    * @param upgradedResources new version tsFiles' resources
-   * @throws WriteProcessException 
    */
-  public static void upgradeOneTsfile(String tsFileName, List<TsFileResource> upgradedResources) 
+  public static void upgradeOneTsfile(String tsFileName, List<TsFileResource> upgradedResources)
       throws IOException, WriteProcessException {
     try (TsFileOnlineUpgradeTool updater = new TsFileOnlineUpgradeTool(tsFileName)) {
       updater.upgradeFile(upgradedResources);
@@ -129,7 +124,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   /**
-   * 
+   *
    */
   public void loadMetadataSize() throws IOException {
     ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
@@ -255,7 +250,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
         0);
     return uncompressedBuffer;
   }
-  
+
   public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
     return readData(-1, header.getCompressedSize());
   }
@@ -320,9 +315,10 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
 
   /**
    * upgrade file and resource
-   * @throws IOException, WriteProcessException 
+   *
+   * @throws IOException, WriteProcessException
    */
-  public void upgradeFile(List<TsFileResource> upgradedResources) 
+  public void upgradeFile(List<TsFileResource> upgradedResources)
       throws IOException, WriteProcessException {
     File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
 
@@ -333,7 +329,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
 
     // ChunkGroupOffset -> version
     Map<Long, Long> oldVersionInfo = getVersionInfo();
-    
+
     // start to scan chunks and chunkGroups
     long startOffsetOfChunkGroup = 0;
     boolean newChunkGroup = true;
@@ -357,7 +353,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
             ChunkHeader header = this.readChunkHeader();
             MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
                 header.getDataType(),
-                header.getEncodingType(), 
+                header.getEncodingType(),
                 header.getCompressionType());
             measurementSchemaList.add(measurementSchema);
             List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -367,8 +363,9 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
               PageHeader pageHeader = readPageHeader(header.getDataType());
               boolean pageInSamePartition = checkIfPageInSameTimePartition(pageHeader);
               pagePartitionInfo.add(pageInSamePartition);
-              ByteBuffer pageData = pageInSamePartition ? 
-                  readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+              ByteBuffer pageData = pageInSamePartition ?
+                  readCompressedPage(pageHeader)
+                  : readPage(pageHeader, header.getCompressionType());
               pageHeadersInChunk.add(pageHeader);
               dataInChunk.add(pageData);
             }
@@ -417,17 +414,14 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   /**
-   *  This method is for rewriting the ChunkGroup which data is in the different time partitions. 
-   *  In this case, we have to decode the data to points, 
-   *  and then rewrite the data points to different chunkWriters,
-   *  finally write chunks to their own upgraded TsFiles
-   * @param pageHeadersInChunkGroup 
-   * @throws PageException 
+   * This method is for rewriting the ChunkGroup which data is in the different time partitions. In
+   * this case, we have to decode the data to points, and then rewrite the data points to different
+   * chunkWriters, finally write chunks to their own upgraded TsFiles
    */
-  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 
-      List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup, 
-      long versionOfChunkGroup, List<List<Boolean>> pagePartitionInfoInChunkGroup) 
-          throws IOException, PageException {
+  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas,
+      List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup,
+      long versionOfChunkGroup, List<List<Boolean>> pagePartitionInfoInChunkGroup)
+      throws IOException, PageException {
     Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
     for (int i = 0; i < schemas.size(); i++) {
       MeasurementSchema schema = schemas.get(i);
@@ -438,17 +432,17 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
           .getDecoderByType(schema.getEncodingType(), schema.getType());
       for (int j = 0; j < pageDataInChunk.size(); j++) {
         if (Boolean.TRUE.equals(pagePartitionInfo.get(j))) {
-          writePageInSamePartitionToFile(oldTsFile, schema, pageHeadersInChunk.get(j), 
+          writePageInSamePartitionToFile(oldTsFile, schema, pageHeadersInChunk.get(j),
               pageDataInChunk.get(j), chunkWritersInChunkGroup);
-        }
-        else {
-          writePageInDifferentPartitionsToFiles(oldTsFile, schema, pageDataInChunk.get(j), 
+        } else {
+          writePageInDifferentPartitionsToFiles(oldTsFile, schema, pageDataInChunk.get(j),
               chunkWritersInChunkGroup);
         }
       }
     }
-    
-    for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup.entrySet()) {
+
+    for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup
+        .entrySet()) {
       long partitionId = entry.getKey();
       TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
       tsFileIOWriter.startChunkGroup(deviceId);
@@ -462,32 +456,34 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   private TsFileIOWriter getOrDefaultTsFileIOWriter(File oldTsFile, long partition) {
-    return partitionWriterMap.computeIfAbsent(partition, k -> 
-      {
-        File partitionDir = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
-            + File.separator + partition);
-        if (!partitionDir.exists()) {
-          partitionDir.mkdirs();
-        }
-        File newFile = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
-            + File.separator + partition + File.separator+ oldTsFile.getName());
-        try {
-          if (!newFile.createNewFile()) {
-            logger.error("The TsFile {} has been created ", newFile);
+    return partitionWriterMap.computeIfAbsent(partition, k ->
+        {
+          File partitionDir = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+              + File.separator + partition);
+          if (!partitionDir.exists()) {
+            partitionDir.mkdirs();
+          }
+          File newFile = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+              + File.separator + partition + File.separator + oldTsFile.getName());
+          try {
+            if (!newFile.createNewFile()) {
+              logger.error("The TsFile {} has been created ", newFile);
+              return null;
+            }
+            return new TsFileIOWriter(newFile);
+          } catch (IOException e) {
+            logger.error("Create new TsFile {} failed ", newFile);
             return null;
           }
-          return new TsFileIOWriter(newFile);
-        } catch (IOException e) {
-          logger.error("Create new TsFile {} failed ", newFile);
-          return null;
         }
-      }
     );
   }
 
-  private void writePageInSamePartitionToFile(File oldTsFile, MeasurementSchema schema, PageHeader pageHeader, 
-      ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) 
-          throws PageException {
+  private void writePageInSamePartitionToFile(File oldTsFile, MeasurementSchema schema,
+      PageHeader pageHeader,
+      ByteBuffer pageData,
+      Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      throws PageException {
     long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
     getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
     Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup
@@ -500,8 +496,9 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   private void writePageInDifferentPartitionsToFiles(File oldTsFile, MeasurementSchema schema,
-      ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
-          throws IOException {
+      ByteBuffer pageData,
+      Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      throws IOException {
     valueDecoder.reset();
     PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder,
         defaultTimeDecoder, null);
@@ -510,7 +507,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
       long time = batchData.currentTime();
       Object value = batchData.currentValue();
       long partitionId = StorageEngine.getTimePartition(time);
-      
+
       Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup
           .getOrDefault(partitionId, new HashMap<>());
       ChunkWriterImpl chunkWriter = chunkWriters
@@ -546,9 +543,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   /**
-   *  check if the file to be upgraded has correct magic strings and version number
-   *  @param oldTsFile
-   *  @throws IOException 
+   * check if the file to be upgraded has correct magic strings and version number
    */
   private boolean fileCheck(File oldTsFile) throws IOException {
     long fileSize;
@@ -602,12 +597,14 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
     return versionInfo;
   }
 
-  private TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter) throws IOException {
+  private TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter)
+      throws IOException {
     tsFileIOWriter.endFile();
     TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile());
     Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = tsFileIOWriter
         .getDeviceTimeseriesMetadataMap();
-    for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
+    for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap
+        .entrySet()) {
       String device = entry.getKey();
       for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
         tsFileResource.updateStartTime(device, timeseriesMetaData.getStatistics().getStartTime());