You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/11/03 02:45:04 UTC

[iotdb] branch IOTDB-4834 created (now 58a00a19cb)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-4834
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 58a00a19cb add data type checking in single series compaction executor

This branch includes the following new commits:

     new 58a00a19cb add data type checking in single series compaction executor

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add data type checking in single series compaction executor

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4834
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 58a00a19cbb49965b73c9bfbd1c517e270de3166
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Nov 3 10:44:42 2022 +0800

    add data type checking in single series compaction executor
---
 .../db/engine/compaction/CompactionUtils.java      | 27 +++++++++++++
 .../utils/SingleSeriesCompactionExecutor.java      | 44 +++++++++++++++++++++-
 2 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 5e5561b43e..ecaceed585 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -19,15 +19,24 @@
 package org.apache.iotdb.db.engine.compaction;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -217,4 +226,22 @@ public class CompactionUtils {
       }
     }
   }
+
+  public static IMeasurementSchema fetchSchema(String device, String measurementId)
+      throws IllegalPathException {
+    ISchemaFetcher schemaFetcher =
+        IoTDBDescriptor.getInstance().getConfig().isClusterMode()
+            ? ClusterSchemaFetcher.getInstance()
+            : StandaloneSchemaFetcher.getInstance();
+    PathPatternTree patternTree = new PathPatternTree();
+    patternTree.appendFullPath(new PartialPath(device, measurementId));
+    patternTree.constructTree();
+    ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+    if (!schemaTree.getAllMeasurement().isEmpty()) {
+      MeasurementPath path = schemaTree.getAllMeasurement().get(0);
+      return path.getMeasurementSchema();
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index d1d4a366e7..56ee824fea 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -18,9 +18,12 @@
  */
 package org.apache.iotdb.db.engine.compaction.inner.utils;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -40,6 +43,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -47,6 +52,7 @@ import java.util.List;
 
 /** This class is used to compact one series during inner space compaction. */
 public class SingleSeriesCompactionExecutor {
+  private static final Logger log = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private String device;
   private PartialPath series;
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList;
@@ -63,6 +69,7 @@ public class SingleSeriesCompactionExecutor {
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
   private long pointCountInChunkWriter = 0;
+  private boolean alreadyFetchSchema = false;
 
   private final long targetChunkSize =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -110,7 +117,7 @@ public class SingleSeriesCompactionExecutor {
    * This function execute the compaction of a single time series. Notice, the result of single
    * series compaction may contain more than one chunk.
    */
-  public void execute() throws IOException {
+  public void execute() throws IOException, IllegalPathException {
     while (readerAndChunkMetadataList.size() > 0) {
       Pair<TsFileSequenceReader, List<ChunkMetadata>> readerListPair =
           readerAndChunkMetadataList.removeFirst();
@@ -121,6 +128,20 @@ public class SingleSeriesCompactionExecutor {
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
         }
+
+        if (!checkDataType(currentChunk)) {
+          // after fetching the correct schema
+          // the datatype of current chunk is still inconsistent with schema
+          // abort current chunk
+          log.warn(
+              "Abort a chunk from {}, because the datatype is inconsistent, "
+                  + "type of schema is {}, but type of chunk is {}",
+              reader.getFileName(),
+              schema.getType().toString(),
+              currentChunk.getHeader().getDataType().toString());
+          continue;
+        }
+
         CompactionMetricsRecorder.recordReadInfo(
             currentChunk.getHeader().getSerializedSize() + currentChunk.getHeader().getDataSize());
 
@@ -156,6 +177,27 @@ public class SingleSeriesCompactionExecutor {
     targetResource.updateEndTime(device, maxEndTimestamp);
   }
 
+  private boolean checkDataType(Chunk currentChunk) throws IllegalPathException {
+    if (currentChunk.getHeader().getDataType() != schema.getType()) {
+      // the datatype is not consistent
+      fixSchemaInconsistent();
+    }
+    return currentChunk.getHeader().getDataType() == schema.getType();
+  }
+
+  private void fixSchemaInconsistent() throws IllegalPathException {
+    if (alreadyFetchSchema) {
+      return;
+    }
+    IMeasurementSchema correctSchema =
+        CompactionUtils.fetchSchema(device, schema.getMeasurementId());
+    if (schema.getType() != correctSchema.getType()) {
+      chunkWriter = new ChunkWriterImpl(correctSchema);
+      schema = correctSchema;
+    }
+    alreadyFetchSchema = true;
+  }
+
   private void constructChunkWriterFromReadChunk(Chunk chunk) {
     ChunkHeader chunkHeader = chunk.getHeader();
     this.schema =