You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/04/01 11:52:04 UTC

[GitHub] [iotdb] jixuan1989 commented on a change in pull request #2864: [IOTDB-1230]Support cross multi time partition when loading one TsFile

jixuan1989 commented on a change in pull request #2864:
URL: https://github.com/apache/iotdb/pull/2864#discussion_r605593290



##########
File path: server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
##########
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsFileRewriteTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsFileRewriteTool.class);
+
+  protected TsFileSequenceReader reader;
+  protected File oldTsFile;
+  protected List<Modification> oldModification;
+  protected Iterator<Modification> modsIterator;
+
+  /** new tsFile writer -> list of new modification */
+  protected Map<TsFileIOWriter, ModificationFile> fileModificationMap;
+
+  protected Deletion currentMod;
+  protected Decoder defaultTimeDecoder =
+      Decoder.getDecoderByType(
+          TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+          TSDataType.INT64);
+  protected Decoder valueDecoder;
+
+  /** PartitionId -> TsFileIOWriter */
+  protected Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /** Maximum index of plans executed within this TsFile. */
+  protected long maxPlanIndex = Long.MIN_VALUE;
+
+  /** Minimum index of plans executed within this TsFile. */
+  protected long minPlanIndex = Long.MAX_VALUE;
+
+  /**
+   * Create a file reader of the given file. The reader will read the real data and rewrite to some
+   * new tsFiles.
+   *
+   * @throws IOException If some I/O error occurs
+   */
+  public TsFileRewriteTool(TsFileResource resourceToBeRewritten) throws IOException {
+    oldTsFile = resourceToBeRewritten.getTsFile();
+    String file = oldTsFile.getAbsolutePath();
+    reader = new TsFileSequenceReader(file);
+    partitionWriterMap = new HashMap<>();
+    if (FSFactoryProducer.getFSFactory().getFile(file + ModificationFile.FILE_SUFFIX).exists()) {
+      oldModification = (List<Modification>) resourceToBeRewritten.getModFile().getModifications();
+      modsIterator = oldModification.iterator();
+      fileModificationMap = new HashMap<>();
+    }
+  }
+
+  /**
+   * Rewrite an old file to the latest version
+   *
+   * @param resourceToBeRewritten the tsfile which to be rewrite
+   * @param rewrittenResources the rewritten files
+   */
+  public static void rewriteTsFile(
+      TsFileResource resourceToBeRewritten, List<TsFileResource> rewrittenResources)
+      throws IOException, WriteProcessException {
+    try (TsFileRewriteTool rewriteTool = new TsFileRewriteTool(resourceToBeRewritten)) {
+      rewriteTool.parseAndRewriteFile(rewrittenResources);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.reader.close();
+  }
+
+  /**
+   * Parse the old files and generate some new files according to the time partition interval.
+   *
+   * @throws IOException WriteProcessException
+   */
+  @SuppressWarnings({"squid:S3776", "deprecation"}) // Suppress high Cognitive Complexity warning
+  public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
+      throws IOException, WriteProcessException {
+    // check if the TsFile has correct header
+    if (!fileCheck()) {
+      return;
+    }
+    int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
+    reader.position(headerLength);
+    // start to scan chunks and chunkGroups
+    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
+    List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
+    List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
+    byte marker;
+    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+    String lastChunkGroupDeviceId = null;
+    try {
+      while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:

Review comment:
       if "only one page in the chunk", then there is no pageStatistics in the page header. 
   
   Line 170 and 171 has indicated that in PageHeader's deserialize method: 
   ```
   PageHeader pageHeader =
                     reader.readPageHeader(dataType, header.getChunkType() == MetaMarker.CHUNK_HEADER);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org