You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/11/03 02:45:05 UTC

[iotdb] 01/01: add data type checking in single series compaction executor

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4834
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 58a00a19cbb49965b73c9bfbd1c517e270de3166
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Nov 3 10:44:42 2022 +0800

    add data type checking in single series compaction executor
---
 .../db/engine/compaction/CompactionUtils.java      | 27 +++++++++++++
 .../utils/SingleSeriesCompactionExecutor.java      | 44 +++++++++++++++++++++-
 2 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 5e5561b43e..ecaceed585 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -19,15 +19,24 @@
 package org.apache.iotdb.db.engine.compaction;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -217,4 +226,22 @@ public class CompactionUtils {
       }
     }
   }
+
+  public static IMeasurementSchema fetchSchema(String device, String measurementId)
+      throws IllegalPathException {
+    ISchemaFetcher schemaFetcher =
+        IoTDBDescriptor.getInstance().getConfig().isClusterMode()
+            ? ClusterSchemaFetcher.getInstance()
+            : StandaloneSchemaFetcher.getInstance();
+    PathPatternTree patternTree = new PathPatternTree();
+    patternTree.appendFullPath(new PartialPath(device, measurementId));
+    patternTree.constructTree();
+    ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+    if (!schemaTree.getAllMeasurement().isEmpty()) {
+      MeasurementPath path = schemaTree.getAllMeasurement().get(0);
+      return path.getMeasurementSchema();
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index d1d4a366e7..56ee824fea 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -18,9 +18,12 @@
  */
 package org.apache.iotdb.db.engine.compaction.inner.utils;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -40,6 +43,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -47,6 +52,7 @@ import java.util.List;
 
 /** This class is used to compact one series during inner space compaction. */
 public class SingleSeriesCompactionExecutor {
+  private static final Logger log = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private String device;
   private PartialPath series;
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList;
@@ -63,6 +69,7 @@ public class SingleSeriesCompactionExecutor {
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
   private long pointCountInChunkWriter = 0;
+  private boolean alreadyFetchSchema = false;
 
   private final long targetChunkSize =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -110,7 +117,7 @@ public class SingleSeriesCompactionExecutor {
    * This function execute the compaction of a single time series. Notice, the result of single
    * series compaction may contain more than one chunk.
    */
-  public void execute() throws IOException {
+  public void execute() throws IOException, IllegalPathException {
     while (readerAndChunkMetadataList.size() > 0) {
       Pair<TsFileSequenceReader, List<ChunkMetadata>> readerListPair =
           readerAndChunkMetadataList.removeFirst();
@@ -121,6 +128,20 @@ public class SingleSeriesCompactionExecutor {
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
         }
+
+        if (!checkDataType(currentChunk)) {
+          // after fetching the correct schema
+          // the datatype of current chunk is still inconsistent with schema
+          // abort current chunk
+          log.warn(
+              "Abort a chunk from {}, because the datatype is inconsistent, "
+                  + "type of schema is {}, but type of chunk is {}",
+              reader.getFileName(),
+              schema.getType().toString(),
+              currentChunk.getHeader().getDataType().toString());
+          continue;
+        }
+
         CompactionMetricsRecorder.recordReadInfo(
             currentChunk.getHeader().getSerializedSize() + currentChunk.getHeader().getDataSize());
 
@@ -156,6 +177,27 @@ public class SingleSeriesCompactionExecutor {
     targetResource.updateEndTime(device, maxEndTimestamp);
   }
 
+  private boolean checkDataType(Chunk currentChunk) throws IllegalPathException {
+    if (currentChunk.getHeader().getDataType() != schema.getType()) {
+      // the datatype is not consistent
+      fixSchemaInconsistent();
+    }
+    return currentChunk.getHeader().getDataType() == schema.getType();
+  }
+
+  private void fixSchemaInconsistent() throws IllegalPathException {
+    if (alreadyFetchSchema) {
+      return;
+    }
+    IMeasurementSchema correctSchema =
+        CompactionUtils.fetchSchema(device, schema.getMeasurementId());
+    if (schema.getType() != correctSchema.getType()) {
+      chunkWriter = new ChunkWriterImpl(correctSchema);
+      schema = correctSchema;
+    }
+    alreadyFetchSchema = true;
+  }
+
   private void constructChunkWriterFromReadChunk(Chunk chunk) {
     ChunkHeader chunkHeader = chunk.getHeader();
     this.schema =