You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/11/03 02:45:05 UTC
[iotdb] 01/01: add data type checking in single series compaction executor
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4834
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 58a00a19cbb49965b73c9bfbd1c517e270de3166
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Nov 3 10:44:42 2022 +0800
add data type checking in single series compaction executor
---
.../db/engine/compaction/CompactionUtils.java | 27 +++++++++++++
.../utils/SingleSeriesCompactionExecutor.java | 44 +++++++++++++++++++++-
2 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 5e5561b43e..ecaceed585 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -19,15 +19,24 @@
package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -217,4 +226,22 @@ public class CompactionUtils {
}
}
}
+
+ public static IMeasurementSchema fetchSchema(String device, String measurementId)
+ throws IllegalPathException {
+ ISchemaFetcher schemaFetcher =
+ IoTDBDescriptor.getInstance().getConfig().isClusterMode()
+ ? ClusterSchemaFetcher.getInstance()
+ : StandaloneSchemaFetcher.getInstance();
+ PathPatternTree patternTree = new PathPatternTree();
+ patternTree.appendFullPath(new PartialPath(device, measurementId));
+ patternTree.constructTree();
+ ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+ if (!schemaTree.getAllMeasurement().isEmpty()) {
+ MeasurementPath path = schemaTree.getAllMeasurement().get(0);
+ return path.getMeasurementSchema();
+ } else {
+ return null;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index d1d4a366e7..56ee824fea 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -18,9 +18,12 @@
*/
package org.apache.iotdb.db.engine.compaction.inner.utils;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -40,6 +43,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
@@ -47,6 +52,7 @@ import java.util.List;
/** This class is used to compact one series during inner space compaction. */
public class SingleSeriesCompactionExecutor {
+ private static final Logger log = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private String device;
private PartialPath series;
private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList;
@@ -63,6 +69,7 @@ public class SingleSeriesCompactionExecutor {
private long minStartTimestamp = Long.MAX_VALUE;
private long maxEndTimestamp = Long.MIN_VALUE;
private long pointCountInChunkWriter = 0;
+ private boolean alreadyFetchSchema = false;
private final long targetChunkSize =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -110,7 +117,7 @@ public class SingleSeriesCompactionExecutor {
* This function execute the compaction of a single time series. Notice, the result of single
* series compaction may contain more than one chunk.
*/
- public void execute() throws IOException {
+ public void execute() throws IOException, IllegalPathException {
while (readerAndChunkMetadataList.size() > 0) {
Pair<TsFileSequenceReader, List<ChunkMetadata>> readerListPair =
readerAndChunkMetadataList.removeFirst();
@@ -121,6 +128,20 @@ public class SingleSeriesCompactionExecutor {
if (this.chunkWriter == null) {
constructChunkWriterFromReadChunk(currentChunk);
}
+
+ if (!checkDataType(currentChunk)) {
+ // after fetching the correct schema
+ // the datatype of current chunk is still inconsistent with schema
+ // abort current chunk
+ log.warn(
+ "Abort a chunk from {}, because the datatype is inconsistent, "
+ + "type of schema is {}, but type of chunk is {}",
+ reader.getFileName(),
+ schema.getType().toString(),
+ currentChunk.getHeader().getDataType().toString());
+ continue;
+ }
+
CompactionMetricsRecorder.recordReadInfo(
currentChunk.getHeader().getSerializedSize() + currentChunk.getHeader().getDataSize());
@@ -156,6 +177,27 @@ public class SingleSeriesCompactionExecutor {
targetResource.updateEndTime(device, maxEndTimestamp);
}
+ private boolean checkDataType(Chunk currentChunk) throws IllegalPathException {
+ if (currentChunk.getHeader().getDataType() != schema.getType()) {
+ // the datatype is not consistent
+ fixSchemaInconsistent();
+ }
+ return currentChunk.getHeader().getDataType() == schema.getType();
+ }
+
+ private void fixSchemaInconsistent() throws IllegalPathException {
+ if (alreadyFetchSchema) {
+ return;
+ }
+ IMeasurementSchema correctSchema =
+ CompactionUtils.fetchSchema(device, schema.getMeasurementId());
+ if (schema.getType() != correctSchema.getType()) {
+ chunkWriter = new ChunkWriterImpl(correctSchema);
+ schema = correctSchema;
+ }
+ alreadyFetchSchema = true;
+ }
+
private void constructChunkWriterFromReadChunk(Chunk chunk) {
ChunkHeader chunkHeader = chunk.getHeader();
this.schema =