You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/08 07:04:43 UTC
[iotdb] branch master updated: [IOTDB-2692] Fix compaction exception caused by deleted timeseries (#5173)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b98d25f [IOTDB-2692] Fix compaction exception caused by deleted timeseries (#5173)
b98d25f is described below
commit b98d25f95bd68d1e2b8df88aeed9809314020ea5
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue Mar 8 15:03:53 2022 +0800
[IOTDB-2692] Fix compaction exception caused by deleted timeseries (#5173)
---
.../db/engine/compaction/CompactionUtils.java | 35 ++++++++++++++++------
.../inner/utils/InnerSpaceCompactionUtils.java | 18 +++++++++--
.../utils/SingleSeriesCompactionExecutor.java | 13 ++++----
.../db/engine/compaction/CompactionUtilsTest.java | 10 ++++---
.../InnerSpaceCompactionUtilsNoAlignedTest.java | 9 ++++--
.../compaction/utils/CompactionCheckerUtils.java | 5 ++++
6 files changed, 65 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 4102f98..9485f1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -60,6 +61,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* This tool can be used to perform inner space or cross space compaction of aligned and non aligned
@@ -121,20 +123,30 @@ public class CompactionUtils {
throws IOException, MetadataException {
MultiTsFileDeviceIterator.AlignedMeasurmentIterator alignedMeasurmentIterator =
deviceIterator.iterateAlignedSeries(device);
- List<String> allMeasurments = alignedMeasurmentIterator.getAllMeasurements();
+ List<String> allMeasurements = alignedMeasurmentIterator.getAllMeasurements();
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- for (String measurement : allMeasurments) {
+ for (String measurement : allMeasurements) {
// TODO: use IDTable
- measurementSchemas.add(
- IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ try {
+ measurementSchemas.add(
+ IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ } catch (PathNotExistException e) {
+ logger.info("A deleted path is skipped: {}", e.getMessage());
+ }
}
-
+ if (measurementSchemas.isEmpty()) {
+ return;
+ }
+ List<String> existedMeasurements =
+ measurementSchemas.stream()
+ .map(IMeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList());
IBatchReader dataBatchReader =
constructReader(
device,
- allMeasurments,
+ existedMeasurements,
measurementSchemas,
- new HashSet<>(allMeasurments),
+ new HashSet<>(existedMeasurements),
queryContext,
queryDataSource,
true);
@@ -163,8 +175,13 @@ public class CompactionUtils {
Set<String> allMeasurementSet = new HashSet<>(allMeasurements);
for (String measurement : allMeasurements) {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- measurementSchemas.add(
- IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ try {
+ measurementSchemas.add(
+ IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ } catch (PathNotExistException e) {
+ logger.info("A deleted path is skipped: {}", e.getMessage());
+ continue;
+ }
IBatchReader dataBatchReader =
constructReader(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 432cc54..91f5a9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -31,13 +31,17 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.apache.commons.io.FileUtils;
@@ -108,12 +112,22 @@ public class InnerSpaceCompactionUtils {
while (seriesIterator.hasNextSeries()) {
checkThreadInterrupted(targetResource);
// TODO: we can provide a configuration item to enable concurrent between each series
- String currentSeries = seriesIterator.nextSeries();
+ PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+ IMeasurementSchema measurementSchema;
+ // TODO: seriesIterator needs to be refactor.
+ // This statement must be called before next hasNextSeries() called, or it may be trapped in a
+ // dead-loop.
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
+ try {
+ measurementSchema = IoTDB.metaManager.getSeriesSchema(p);
+ } catch (PathNotExistException e) {
+ logger.info("A deleted path is skipped: {}", e.getMessage());
+ continue;
+ }
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
new SingleSeriesCompactionExecutor(
- device, currentSeries, readerAndChunkMetadataList, writer, targetResource);
+ p, measurementSchema, readerAndChunkMetadataList, writer, targetResource);
compactionExecutorOfCurrentTimeSeries.execute();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index dd2157d..068ee2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -21,9 +21,7 @@ package org.apache.iotdb.db.engine.compaction.inner.utils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -70,16 +68,15 @@ public class SingleSeriesCompactionExecutor {
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
public SingleSeriesCompactionExecutor(
- String device,
- String timeSeries,
+ PartialPath series,
+ IMeasurementSchema measurementSchema,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList,
TsFileIOWriter fileWriter,
- TsFileResource targetResource)
- throws MetadataException {
- this.device = device;
+ TsFileResource targetResource) {
+ this.device = series.getDevice();
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.fileWriter = fileWriter;
- this.schema = IoTDB.metaManager.getSeriesSchema(new PartialPath(device, timeSeries));
+ this.schema = measurementSchema;
this.chunkWriter = new ChunkWriterImpl(this.schema);
this.cachedChunk = null;
this.cachedChunkMetadata = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
index affacd7..11ac1e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
@@ -145,7 +145,8 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
Total 6 seq files, each file has different nonAligned timeseries.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
- Fifth and Sixth file: d0 ~ d4 and s0 ~ s4, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+ Fifth and Sixth file: d0 ~ d4 and s0 ~ s5, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+ Timeseries d[0-4].s5 are deleted before compaction.
*/
@Test
public void testSeqInnerSpaceCompactionWithDifferentTimeseries()
@@ -154,7 +155,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
registerTimeseriesInMManger(5, 5, false);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
- createFiles(2, 5, 5, 50, 600, 800, 50, 50, false, true);
+ createFiles(2, 5, 6, 50, 600, 800, 50, 50, false, true);
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
@@ -888,7 +889,8 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
Total 6 seq files, each file has different aligned timeseries, which cause empty page.
First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
- Fifth and Sixth file: d0 ~ d4 and s0 ~ s6, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+ Fifth and Sixth file: d0 ~ d4 and s0 ~ s7, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+ Timeseries d[0-4].s7 are deleted before compaction.
*/
@Test
public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPage()
@@ -898,7 +900,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, true, true);
- createFiles(2, 5, 7, 50, 600, 800, 50, 50, true, true);
+ createFiles(2, 5, 8, 50, 600, 800, 50, 50, true, true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 5;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
index 5f47547..78a8401 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsNoAlignedTest.java
@@ -431,6 +431,10 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
try {
List<TsFileResource> sourceFiles = new ArrayList();
+ Set<String> fullPathSetWithDeleted = new HashSet<>(fullPathSet);
+ // we add a deleted timeseries to simulate timeseries is deleted before compaction.
+ String deletedPath = "root.compactionTest.device999.s999";
+ fullPathSetWithDeleted.add(deletedPath);
int fileNum = 6;
long pointStep = 300L;
for (int i = 0; i < fileNum; ++i) {
@@ -442,7 +446,7 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
sourceFiles.add(resource);
CompactionFileGeneratorUtils.writeTsFile(
- fullPathSet, chunkPagePointsNum, i * 1500L, resource);
+ fullPathSetWithDeleted, chunkPagePointsNum, i * 1500L, resource);
}
Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
@@ -476,9 +480,10 @@ public class InnerSpaceCompactionUtilsNoAlignedTest {
if (curPointNum > 0) {
chunkPointsArray.add(pointsArray);
}
- for (String path : fullPathSet) {
+ for (String path : fullPathSetWithDeleted) {
chunkPagePointsNumMerged.put(path, chunkPointsArray);
}
+ chunkPagePointsNumMerged.put(deletedPath, null);
CompactionCheckerUtils.checkChunkAndPage(chunkPagePointsNumMerged, targetResource);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java
index fd87343..76aba4a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionCheckerUtils.java
@@ -65,6 +65,7 @@ import java.util.TreeMap;
import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class CompactionCheckerUtils {
@@ -466,6 +467,10 @@ public class CompactionCheckerUtils {
String fullPath = chunkPagePointsNumEntry.getKey();
List<List<Long>> sourceChunkPages = chunkPagePointsNumEntry.getValue();
List<List<Long>> mergedChunkPages = mergedChunkPagePointsNum.get(fullPath);
+ if (sourceChunkPages == null) {
+ assertNull(mergedChunkPages);
+ continue;
+ }
for (int i = 0; i < sourceChunkPages.size(); i++) {
for (int j = 0; j < sourceChunkPages.get(i).size(); j++) {
assertEquals(sourceChunkPages.get(i).get(j), mergedChunkPages.get(i).get(j));