You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/18 02:34:38 UTC

[iotdb] branch master updated: [IOTDB-4636] Fix IndexOutOfBoundsException when compacting aligned series (#7638)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cb85a9286 [IOTDB-4636] Fix IndexOutOfBoundsException when compacting aligned series (#7638)
2cb85a9286 is described below

commit 2cb85a92869c400561a4ef317e84f2fa201b9ec4
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Tue Oct 18 10:34:32 2022 +0800

    [IOTDB-4636] Fix IndexOutOfBoundsException when compacting aligned series (#7638)
---
 .../impl/ReadChunkCompactionPerformer.java         | 21 +++++-
 .../ReadChunkCompactionPerformerAlignedTest.java   | 79 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index 582cc82e15..dbd4b1831a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -78,13 +78,11 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
         String device = deviceInfo.left;
         boolean aligned = deviceInfo.right;
 
-        writer.startChunkGroup(device);
         if (aligned) {
           compactAlignedSeries(device, targetResource, writer, deviceIterator);
         } else {
           compactNotAlignedSeries(device, targetResource, writer, deviceIterator);
         }
-        writer.endChunkGroup();
       }
 
       for (TsFileResource tsFileResource : seqFiles) {
@@ -120,10 +118,15 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
     checkThreadInterrupted();
     LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
         deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+    if (!checkAlignedSeriesExists(readerAndChunkMetadataList)) {
+      return;
+    }
+    writer.startChunkGroup(device);
     AlignedSeriesCompactionExecutor compactionExecutor =
         new AlignedSeriesCompactionExecutor(
             device, targetResource, readerAndChunkMetadataList, writer);
     compactionExecutor.execute();
+    writer.endChunkGroup();
   }
 
   private void checkThreadInterrupted() throws InterruptedException {
@@ -134,12 +137,25 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
     }
   }
 
+  private boolean checkAlignedSeriesExists(
+      LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
+          readerAndChunkMetadataList) {
+    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair :
+        readerAndChunkMetadataList) {
+      if (!readerListPair.right.isEmpty()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private void compactNotAlignedSeries(
       String device,
       TsFileResource targetResource,
       TsFileIOWriter writer,
       MultiTsFileDeviceIterator deviceIterator)
       throws IOException, MetadataException, InterruptedException {
+    writer.startChunkGroup(device);
     MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
         deviceIterator.iterateNotAlignedSeries(device, true);
     while (seriesIterator.hasNextSeries()) {
@@ -155,6 +171,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
           new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList, writer, targetResource);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
+    writer.endChunkGroup();
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java
index 8c2fbf2ae4..d39aaf75cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java
@@ -547,4 +547,83 @@ public class ReadChunkCompactionPerformerAlignedTest {
             new ArrayList<>());
     CompactionCheckerUtils.validDataByValueList(originData, compactedData);
   }
+
+  @Test
+  public void testAlignedTsFileWithEmptyChunkGroup() throws Exception {
+    List<String> devices = new ArrayList<>();
+    devices.add(storageGroup + ".d" + 0);
+    for (int i = 1; i < 5; ++i) {
+      devices.add(devices.get(i - 1) + ".d" + i);
+    }
+    boolean[] aligned = new boolean[] {false, true, false, true, false};
+    List<IMeasurementSchema> schemas = new ArrayList<>();
+    schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
+    schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
+    schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
+    schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
+    schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
+
+    TestUtilsForAlignedSeries.registerTimeSeries(
+        storageGroup,
+        devices.toArray(new String[] {}),
+        schemas.toArray(new IMeasurementSchema[] {}),
+        aligned);
+
+    boolean[] randomNull = new boolean[] {true, false, true, false, true};
+    int timeInterval = 500;
+    Random random = new Random(5);
+    List<TsFileResource> resources = new ArrayList<>();
+    for (int i = 1; i < 30; i++) {
+      TsFileResource resource =
+          new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
+      TestUtilsForAlignedSeries.writeTsFile(
+          devices.toArray(new String[0]),
+          schemas.toArray(new IMeasurementSchema[0]),
+          resource,
+          aligned,
+          timeInterval * i,
+          timeInterval * (i + 1),
+          randomNull);
+      resources.add(resource);
+    }
+    TsFileResource resource =
+        new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", 30, 30)));
+    // the start time and end time is the same
+    // it will write tsfile with empty chunk group
+    TestUtilsForAlignedSeries.writeTsFile(
+        devices.toArray(new String[0]),
+        schemas.toArray(new IMeasurementSchema[0]),
+        resource,
+        aligned,
+        timeInterval * (30 + 1),
+        timeInterval * (30 + 1),
+        randomNull);
+    resources.add(resource);
+    TsFileResource targetResource = new TsFileResource(new File(dataDirectory, "1-1-1-0.tsfile"));
+    List<PartialPath> fullPaths = new ArrayList<>();
+    List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
+    List<String> measurementIds = new ArrayList<>();
+    schemas.forEach(
+        (e) -> {
+          measurementIds.add(e.getMeasurementId());
+        });
+    for (String device : devices) {
+      iMeasurementSchemas.addAll(schemas);
+      fullPaths.add(new AlignedPath(device, measurementIds, schemas));
+    }
+    Map<PartialPath, List<TimeValuePair>> originData =
+        CompactionCheckerUtils.getDataByQuery(
+            fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
+    ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    Map<PartialPath, List<TimeValuePair>> compactedData =
+        CompactionCheckerUtils.getDataByQuery(
+            fullPaths,
+            iMeasurementSchemas,
+            Collections.singletonList(targetResource),
+            new ArrayList<>());
+    CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+  }
 }