You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/08 07:04:43 UTC

[iotdb] branch master updated: [IOTDB-2692] Fix compaction exception caused by deleted timeseries (#5173)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b98d25f  [IOTDB-2692] Fix compaction exception caused by deleted timeseries (#5173)
b98d25f is described below

commit b98d25f95bd68d1e2b8df88aeed9809314020ea5
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue Mar 8 15:03:53 2022 +0800

    [IOTDB-2692] Fix compaction exception caused by deleted timeseries (#5173)
---
 .../db/engine/compaction/CompactionUtils.java      | 35 ++++++++++++++++------
 .../inner/utils/InnerSpaceCompactionUtils.java     | 18 +++++++++--
 .../utils/SingleSeriesCompactionExecutor.java      | 13 ++++----
 .../db/engine/compaction/CompactionUtilsTest.java  | 10 ++++---
 .../InnerSpaceCompactionUtilsNoAlignedTest.java    |  9 ++++--
 .../compaction/utils/CompactionCheckerUtils.java   |  5 ++++
 6 files changed, 65 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 4102f98..9485f1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -60,6 +61,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This tool can be used to perform inner space or cross space compaction of aligned and non aligned
@@ -121,20 +123,30 @@ public class CompactionUtils {
       throws IOException, MetadataException {
     MultiTsFileDeviceIterator.AlignedMeasurmentIterator alignedMeasurmentIterator =
         deviceIterator.iterateAlignedSeries(device);
-    List<String> allMeasurments = alignedMeasurmentIterator.getAllMeasurements();
+    List<String> allMeasurements = alignedMeasurmentIterator.getAllMeasurements();
     List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-    for (String measurement : allMeasurments) {
+    for (String measurement : allMeasurements) {
       // TODO: use IDTable
-      measurementSchemas.add(
-          IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+      try {
+        measurementSchemas.add(
+            IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+      } catch (PathNotExistException e) {
+        logger.info("A deleted path is skipped: {}", e.getMessage());
+      }
     }
-
+    if (measurementSchemas.isEmpty()) {
+      return;
+    }
+    List<String> existedMeasurements =
+        measurementSchemas.stream()
+            .map(IMeasurementSchema::getMeasurementId)
+            .collect(Collectors.toList());
     IBatchReader dataBatchReader =
         constructReader(
             device,
-            allMeasurments,
+            existedMeasurements,
             measurementSchemas,
-            new HashSet<>(allMeasurments),
+            new HashSet<>(existedMeasurements),
             queryContext,
             queryDataSource,
             true);
@@ -163,8 +175,13 @@ public class CompactionUtils {
     Set<String> allMeasurementSet = new HashSet<>(allMeasurements);
     for (String measurement : allMeasurements) {
       List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-      measurementSchemas.add(
-          IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+      try {
+        measurementSchemas.add(
+            IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+      } catch (PathNotExistException e) {
+        logger.info("A deleted path is skipped: {}", e.getMessage());
+        continue;
+      }
 
       IBatchReader dataBatchReader =
           constructReader(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 432cc54..91f5a9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -31,13 +31,17 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.apache.commons.io.FileUtils;
@@ -108,12 +112,22 @@ public class InnerSpaceCompactionUtils {
     while (seriesIterator.hasNextSeries()) {
       checkThreadInterrupted(targetResource);
       // TODO: we can provide a configuration item to enable concurrent between each series
-      String currentSeries = seriesIterator.nextSeries();
+      PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+      IMeasurementSchema measurementSchema;
+      // TODO: seriesIterator needs to be refactor.
+      // This statement must be called before next hasNextSeries() called, or it may be trapped in a
+      // dead-loop.
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
           seriesIterator.getMetadataListForCurrentSeries();
+      try {
+        measurementSchema = IoTDB.metaManager.getSeriesSchema(p);
+      } catch (PathNotExistException e) {
+        logger.info("A deleted path is skipped: {}", e.getMessage());
+        continue;
+      }
       SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
           new SingleSeriesCompactionExecutor(
-              device, currentSeries, readerAndChunkMetadataList, writer, targetResource);
+              p, measurementSchema, readerAndChunkMetadataList, writer, targetResource);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index dd2157d..068ee2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -21,9 +21,7 @@ package org.apache.iotdb.db.engine.compaction.inner.utils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -70,16 +68,15 @@ public class SingleSeriesCompactionExecutor {
       IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
 
   public SingleSeriesCompactionExecutor(
-      String device,
-      String timeSeries,
+      PartialPath series,
+      IMeasurementSchema measurementSchema,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList,
       TsFileIOWriter fileWriter,
-      TsFileResource targetResource)
-      throws MetadataException {
-    this.device = device;
+      TsFileResource targetResource) {
+    this.device = series.getDevice();
     this.readerAndChunkMetadataList = readerAndChunkMetadataList;
     this.fileWriter = fileWriter;
-    this.schema = IoTDB.metaManager.getSeriesSchema(new PartialPath(device, timeSeries));
+    this.schema = measurementSchema;
     this.chunkWriter = new ChunkWriterImpl(this.schema);
     this.cachedChunk = null;
     this.cachedChunkMetadata = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
index affacd7..11ac1e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
@@ -145,7 +145,8 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
   Total 6 seq files, each file has different nonAligned timeseries.
   First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is  0 ~ 99 and 150 ~ 249.
   Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
-  Fifth and Sixth file: d0 ~ d4 and s0 ~ s4, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+  Fifth and Sixth file: d0 ~ d4 and s0 ~ s5, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+  Timeseries d[0-4].s5 are deleted before compaction.
   */
   @Test
   public void testSeqInnerSpaceCompactionWithDifferentTimeseries()
@@ -154,7 +155,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     registerTimeseriesInMManger(5, 5, false);
     createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
     createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
-    createFiles(2, 5, 5, 50, 600, 800, 50, 50, false, true);
+    createFiles(2, 5, 6, 50, 600, 800, 50, 50, false, true);
 
     for (int i = 0; i < 5; i++) {
       for (int j = 0; j < 5; j++) {
@@ -888,7 +889,8 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
   Total 6 seq files, each file has different aligned timeseries, which cause empty page.
   First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is  0 ~ 99 and 150 ~ 249.
   Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
-  Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+  Fifth and Sixth file: d0 ~ d4 and s0 ~ s7, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+  Timeseries d[0-4].s7 are deleted before compaction.
   */
   @Test
   public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPage()
@@ -898,7 +900,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
     registerTimeseriesInMManger(5, 7, true);
     createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
     createFiles(2, 3, 5, 50, 250, 250, 50, 50, true, true);
-    createFiles(2, 5, 7, 50, 600, 800, 50, 50, true, true);
+    createFiles(2, 5, 8, 50, 600, 800, 50, 50, true, true);
 
     for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
         i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
index 5f47547..78a8401 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
@@ -431,6 +431,10 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
     IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
     try {
       List<TsFileResource> sourceFiles = new ArrayList();
+      Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+      // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+      String deletedPath = "root.compactionTest.device999.s999";
+      fullPathSetWithDeleted.add(deletedPath);
       int fileNum = 6;
       long pointStep = 300L;
       for (int i = 0; i < fileNum; ++i) {
@@ -442,7 +446,7 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
             new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
         sourceFiles.add(resource);
         CompactionFileGeneratorUtils.writeTsFile(
-            fullPathSet, chunkPagePointsNum, i * 1500L, resource);
+            fullPathSetWithDeleted, chunkPagePointsNum, i * 1500L, resource);
       }
       Map<PartialPath, List<TimeValuePair>> originData =
           CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
@@ -476,9 +480,10 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
       if (curPointNum > 0) {
         chunkPointsArray.add(pointsArray);
       }
-      for (String path : fullPathSet) {
+      for (String path : fullPathSetWithDeleted) {
         chunkPagePointsNumMerged.put(path, chunkPointsArray);
       }
+      chunkPagePointsNumMerged.put(deletedPath, null);
       CompactionCheckerUtils.checkChunkAndPage(chunkPagePointsNumMerged, targetResource);
       Map<PartialPath, List<TimeValuePair>> compactedData =
           CompactionCheckerUtils.getDataByQuery(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java
index fd87343..76aba4a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java
@@ -65,6 +65,7 @@ import java.util.TreeMap;
 
 import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 public class CompactionCheckerUtils {
@@ -466,6 +467,10 @@ public class CompactionCheckerUtils {
         String fullPath = chunkPagePointsNumEntry.getKey();
         List<List<Long>> sourceChunkPages = chunkPagePointsNumEntry.getValue();
         List<List<Long>> mergedChunkPages = mergedChunkPagePointsNum.get(fullPath);
+        if (sourceChunkPages == null) {
+          assertNull(mergedChunkPages);
+          continue;
+        }
         for (int i = 0; i < sourceChunkPages.size(); i++) {
           for (int j = 0; j < sourceChunkPages.get(i).size(); j++) {
             assertEquals(sourceChunkPages.get(i).get(j), mergedChunkPages.get(i).get(j));