You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/13 02:00:14 UTC

[incubator-iotdb] branch master updated: [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage (#1357)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 051ef4d  [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage (#1357)
051ef4d is described below

commit 051ef4dd9cb418e99dc1a37a16443cf418b98180
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sat Jun 13 10:00:04 2020 +0800

    [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage (#1357)
---
 .../apache/iotdb/db/engine/upgrade/UpgradeLog.java |   5 +-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  | 121 ++++++++++-----------
 2 files changed, 61 insertions(+), 65 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
index 2c580c0..1da0acf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
@@ -20,12 +20,12 @@ package org.apache.iotdb.db.engine.upgrade;
 
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,8 +47,7 @@ public class UpgradeLog {
         upgradeLogPath.getParentFile().mkdirs();
       }
       upgradeLogPath.createNewFile();
-      upgradeLogWriter = FSFactoryProducer.getFSFactory()
-          .getBufferedWriter(getUpgradeLogPath(), true);
+      upgradeLogWriter = new BufferedWriter(new FileWriter(getUpgradeLogPath(), true));
       return true;
     } catch (IOException e) {
       logger.error("meet error when create upgrade log, file path:{}",
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 9fc72d2..db973b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -21,13 +21,11 @@ package org.apache.iotdb.db.tools.upgrade;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -47,14 +45,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.v1.file.metadata.ChunkGroupMetaDataV1;
-import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataV1;
 import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataIndexV1;
+import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataV1;
 import org.apache.iotdb.tsfile.v1.file.metadata.TsFileMetadataV1;
 import org.apache.iotdb.tsfile.v1.file.utils.HeaderUtils;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -77,7 +74,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
       TSDataType.INT64);
   private Decoder valueDecoder;
   protected String file;
-  
+
   // PartitionId -> TsFileIOWriter 
   private Map<Long, TsFileIOWriter> partitionWriterMap;
 
@@ -101,8 +98,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
    */
   public TsFileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
     this.file = file;
-    final java.nio.file.Path path = Paths.get(file);
-    tsFileInput = new LocalTsFileInput(path);
+    tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
     partitionWriterMap = new HashMap<>();
     try {
       if (loadMetadataSize) {
@@ -117,11 +113,10 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   /**
    * upgrade a single tsfile
    *
-   * @param tsfileName old version tsFile's absolute path
+   * @param tsFileName old version tsFile's absolute path
    * @param upgradedResources new version tsFiles' resources
-   * @throws WriteProcessException 
    */
-  public static void upgradeOneTsfile(String tsFileName, List<TsFileResource> upgradedResources) 
+  public static void upgradeOneTsfile(String tsFileName, List<TsFileResource> upgradedResources)
       throws IOException, WriteProcessException {
     try (TsFileOnlineUpgradeTool updater = new TsFileOnlineUpgradeTool(tsFileName)) {
       updater.upgradeFile(upgradedResources);
@@ -129,7 +124,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   /**
-   * 
+   *
    */
   public void loadMetadataSize() throws IOException {
     ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
@@ -255,7 +250,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
         0);
     return uncompressedBuffer;
   }
-  
+
   public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
     return readData(-1, header.getCompressedSize());
   }
@@ -320,9 +315,10 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
 
   /**
    * upgrade file and resource
-   * @throws IOException, WriteProcessException 
+   *
+   * @throws IOException, WriteProcessException
    */
-  public void upgradeFile(List<TsFileResource> upgradedResources) 
+  public void upgradeFile(List<TsFileResource> upgradedResources)
       throws IOException, WriteProcessException {
     File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
 
@@ -333,7 +329,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
 
     // ChunkGroupOffset -> version
     Map<Long, Long> oldVersionInfo = getVersionInfo();
-    
+
     // start to scan chunks and chunkGroups
     long startOffsetOfChunkGroup = 0;
     boolean newChunkGroup = true;
@@ -357,7 +353,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
             ChunkHeader header = this.readChunkHeader();
             MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
                 header.getDataType(),
-                header.getEncodingType(), 
+                header.getEncodingType(),
                 header.getCompressionType());
             measurementSchemaList.add(measurementSchema);
             List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -367,8 +363,9 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
               PageHeader pageHeader = readPageHeader(header.getDataType());
               boolean pageInSamePartition = checkIfPageInSameTimePartition(pageHeader);
               pagePartitionInfo.add(pageInSamePartition);
-              ByteBuffer pageData = pageInSamePartition ? 
-                  readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+              ByteBuffer pageData = pageInSamePartition ?
+                  readCompressedPage(pageHeader)
+                  : readPage(pageHeader, header.getCompressionType());
               pageHeadersInChunk.add(pageHeader);
               dataInChunk.add(pageData);
             }
@@ -417,17 +414,14 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   /**
-   *  This method is for rewriting the ChunkGroup which data is in the different time partitions. 
-   *  In this case, we have to decode the data to points, 
-   *  and then rewrite the data points to different chunkWriters,
-   *  finally write chunks to their own upgraded TsFiles
-   * @param pageHeadersInChunkGroup 
-   * @throws PageException 
+   * This method is for rewriting the ChunkGroup which data is in the different time partitions. In
+   * this case, we have to decode the data to points, and then rewrite the data points to different
+   * chunkWriters, finally write chunks to their own upgraded TsFiles
    */
-  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 
-      List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup, 
-      long versionOfChunkGroup, List<List<Boolean>> pagePartitionInfoInChunkGroup) 
-          throws IOException, PageException {
+  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas,
+      List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup,
+      long versionOfChunkGroup, List<List<Boolean>> pagePartitionInfoInChunkGroup)
+      throws IOException, PageException {
     Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
     for (int i = 0; i < schemas.size(); i++) {
       MeasurementSchema schema = schemas.get(i);
@@ -438,17 +432,17 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
           .getDecoderByType(schema.getEncodingType(), schema.getType());
       for (int j = 0; j < pageDataInChunk.size(); j++) {
         if (Boolean.TRUE.equals(pagePartitionInfo.get(j))) {
-          writePageInSamePartitionToFile(oldTsFile, schema, pageHeadersInChunk.get(j), 
+          writePageInSamePartitionToFile(oldTsFile, schema, pageHeadersInChunk.get(j),
               pageDataInChunk.get(j), chunkWritersInChunkGroup);
-        }
-        else {
-          writePageInDifferentPartitionsToFiles(oldTsFile, schema, pageDataInChunk.get(j), 
+        } else {
+          writePageInDifferentPartitionsToFiles(oldTsFile, schema, pageDataInChunk.get(j),
               chunkWritersInChunkGroup);
         }
       }
     }
-    
-    for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup.entrySet()) {
+
+    for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup
+        .entrySet()) {
       long partitionId = entry.getKey();
       TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
       tsFileIOWriter.startChunkGroup(deviceId);
@@ -462,32 +456,34 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   private TsFileIOWriter getOrDefaultTsFileIOWriter(File oldTsFile, long partition) {
-    return partitionWriterMap.computeIfAbsent(partition, k -> 
-      {
-        File partitionDir = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
-            + File.separator + partition);
-        if (!partitionDir.exists()) {
-          partitionDir.mkdirs();
-        }
-        File newFile = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
-            + File.separator + partition + File.separator+ oldTsFile.getName());
-        try {
-          if (!newFile.createNewFile()) {
-            logger.error("The TsFile {} has been created ", newFile);
+    return partitionWriterMap.computeIfAbsent(partition, k ->
+        {
+          File partitionDir = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+              + File.separator + partition);
+          if (!partitionDir.exists()) {
+            partitionDir.mkdirs();
+          }
+          File newFile = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+              + File.separator + partition + File.separator + oldTsFile.getName());
+          try {
+            if (!newFile.createNewFile()) {
+              logger.error("The TsFile {} has been created ", newFile);
+              return null;
+            }
+            return new TsFileIOWriter(newFile);
+          } catch (IOException e) {
+            logger.error("Create new TsFile {} failed ", newFile);
             return null;
           }
-          return new TsFileIOWriter(newFile);
-        } catch (IOException e) {
-          logger.error("Create new TsFile {} failed ", newFile);
-          return null;
         }
-      }
     );
   }
 
-  private void writePageInSamePartitionToFile(File oldTsFile, MeasurementSchema schema, PageHeader pageHeader, 
-      ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) 
-          throws PageException {
+  private void writePageInSamePartitionToFile(File oldTsFile, MeasurementSchema schema,
+      PageHeader pageHeader,
+      ByteBuffer pageData,
+      Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      throws PageException {
     long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
     getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
     Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup
@@ -500,8 +496,9 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   private void writePageInDifferentPartitionsToFiles(File oldTsFile, MeasurementSchema schema,
-      ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
-          throws IOException {
+      ByteBuffer pageData,
+      Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      throws IOException {
     valueDecoder.reset();
     PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder,
         defaultTimeDecoder, null);
@@ -510,7 +507,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
       long time = batchData.currentTime();
       Object value = batchData.currentValue();
       long partitionId = StorageEngine.getTimePartition(time);
-      
+
       Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup
           .getOrDefault(partitionId, new HashMap<>());
       ChunkWriterImpl chunkWriter = chunkWriters
@@ -546,9 +543,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
   }
 
   /**
-   *  check if the file to be upgraded has correct magic strings and version number
-   *  @param oldTsFile
-   *  @throws IOException 
+   * check if the file to be upgraded has correct magic strings and version number
    */
   private boolean fileCheck(File oldTsFile) throws IOException {
     long fileSize;
@@ -602,12 +597,14 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
     return versionInfo;
   }
 
-  private TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter) throws IOException {
+  private TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter)
+      throws IOException {
     tsFileIOWriter.endFile();
     TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile());
     Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = tsFileIOWriter
         .getDeviceTimeseriesMetadataMap();
-    for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
+    for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap
+        .entrySet()) {
       String device = entry.getKey();
       for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
         tsFileResource.updateStartTime(device, timeseriesMetaData.getStatistics().getStartTime());