You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/17 07:43:23 UTC

[iotdb] branch IOTDB-4636 updated (5836e07d6d -> 36acc7eb15)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-4636
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 5836e07d6d Add detailed error messages while query is time out (#7632)
     new 34d3d9b841 check before compacting aligned timeseries
     new 36acc7eb15 add test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impl/ReadChunkCompactionPerformer.java         | 21 +++++-
 .../ReadChunkCompactionPerformerAlignedTest.java   | 79 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 2 deletions(-)


[iotdb] 01/02: check before compacting aligned timeseries

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4636
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 34d3d9b841edf3c2e8596e89de1186838f1c12c2
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Mon Oct 17 15:32:37 2022 +0800

    check before compacting aligned timeseries
---
 .../performer/impl/ReadChunkCompactionPerformer.java       | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index 582cc82e15..a730730964 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -78,13 +78,11 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
         String device = deviceInfo.left;
         boolean aligned = deviceInfo.right;
 
-        writer.startChunkGroup(device);
         if (aligned) {
           compactAlignedSeries(device, targetResource, writer, deviceIterator);
         } else {
           compactNotAlignedSeries(device, targetResource, writer, deviceIterator);
         }
-        writer.endChunkGroup();
       }
 
       for (TsFileResource tsFileResource : seqFiles) {
@@ -120,10 +118,20 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
     checkThreadInterrupted();
     LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
         deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+    boolean anyChunkExists = false;
+    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair :
+        readerAndChunkMetadataList) {
+      anyChunkExists = anyChunkExists || !readerListPair.right.isEmpty();
+    }
+    if (!anyChunkExists) {
+      return;
+    }
+    writer.startChunkGroup(device);
     AlignedSeriesCompactionExecutor compactionExecutor =
         new AlignedSeriesCompactionExecutor(
             device, targetResource, readerAndChunkMetadataList, writer);
     compactionExecutor.execute();
+    writer.endChunkGroup();
   }
 
   private void checkThreadInterrupted() throws InterruptedException {
@@ -140,6 +148,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
       TsFileIOWriter writer,
       MultiTsFileDeviceIterator deviceIterator)
       throws IOException, MetadataException, InterruptedException {
+    writer.startChunkGroup(device);
     MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
         deviceIterator.iterateNotAlignedSeries(device, true);
     while (seriesIterator.hasNextSeries()) {
@@ -155,6 +164,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
           new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList, writer, targetResource);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
+    writer.endChunkGroup();
   }
 
   @Override


[iotdb] 02/02: add test

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4636
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 36acc7eb15e7361deb569719534d2e40412502e2
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Mon Oct 17 15:43:10 2022 +0800

    add test
---
 .../impl/ReadChunkCompactionPerformer.java         | 19 ++++--
 .../ReadChunkCompactionPerformerAlignedTest.java   | 79 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index a730730964..dbd4b1831a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -118,12 +118,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
     checkThreadInterrupted();
     LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
         deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
-    boolean anyChunkExists = false;
-    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair :
-        readerAndChunkMetadataList) {
-      anyChunkExists = anyChunkExists || !readerListPair.right.isEmpty();
-    }
-    if (!anyChunkExists) {
+    if (!checkAlignedSeriesExists(readerAndChunkMetadataList)) {
       return;
     }
     writer.startChunkGroup(device);
@@ -142,6 +137,18 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
     }
   }
 
+  private boolean checkAlignedSeriesExists(
+      LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
+          readerAndChunkMetadataList) {
+    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair :
+        readerAndChunkMetadataList) {
+      if (!readerListPair.right.isEmpty()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private void compactNotAlignedSeries(
       String device,
       TsFileResource targetResource,
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java
index 8c2fbf2ae4..d39aaf75cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java
@@ -547,4 +547,83 @@ public class ReadChunkCompactionPerformerAlignedTest {
             new ArrayList<>());
     CompactionCheckerUtils.validDataByValueList(originData, compactedData);
   }
+
+  @Test
+  public void testAlignedTsFileWithEmptyChunkGroup() throws Exception {
+    List<String> devices = new ArrayList<>();
+    devices.add(storageGroup + ".d" + 0);
+    for (int i = 1; i < 5; ++i) {
+      devices.add(devices.get(i - 1) + ".d" + i);
+    }
+    boolean[] aligned = new boolean[] {false, true, false, true, false};
+    List<IMeasurementSchema> schemas = new ArrayList<>();
+    schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE));
+    schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT));
+    schemas.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemas.add(new MeasurementSchema("s3", TSDataType.INT32));
+    schemas.add(new MeasurementSchema("s4", TSDataType.TEXT));
+    schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN));
+
+    TestUtilsForAlignedSeries.registerTimeSeries(
+        storageGroup,
+        devices.toArray(new String[] {}),
+        schemas.toArray(new IMeasurementSchema[] {}),
+        aligned);
+
+    boolean[] randomNull = new boolean[] {true, false, true, false, true};
+    int timeInterval = 500;
+    Random random = new Random(5);
+    List<TsFileResource> resources = new ArrayList<>();
+    for (int i = 1; i < 30; i++) {
+      TsFileResource resource =
+          new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i)));
+      TestUtilsForAlignedSeries.writeTsFile(
+          devices.toArray(new String[0]),
+          schemas.toArray(new IMeasurementSchema[0]),
+          resource,
+          aligned,
+          timeInterval * i,
+          timeInterval * (i + 1),
+          randomNull);
+      resources.add(resource);
+    }
+    TsFileResource resource =
+        new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", 30, 30)));
+    // the start time and end time is the same
+    // it will write tsfile with empty chunk group
+    TestUtilsForAlignedSeries.writeTsFile(
+        devices.toArray(new String[0]),
+        schemas.toArray(new IMeasurementSchema[0]),
+        resource,
+        aligned,
+        timeInterval * (30 + 1),
+        timeInterval * (30 + 1),
+        randomNull);
+    resources.add(resource);
+    TsFileResource targetResource = new TsFileResource(new File(dataDirectory, "1-1-1-0.tsfile"));
+    List<PartialPath> fullPaths = new ArrayList<>();
+    List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>();
+    List<String> measurementIds = new ArrayList<>();
+    schemas.forEach(
+        (e) -> {
+          measurementIds.add(e.getMeasurementId());
+        });
+    for (String device : devices) {
+      iMeasurementSchemas.addAll(schemas);
+      fullPaths.add(new AlignedPath(device, measurementIds, schemas));
+    }
+    Map<PartialPath, List<TimeValuePair>> originData =
+        CompactionCheckerUtils.getDataByQuery(
+            fullPaths, iMeasurementSchemas, resources, new ArrayList<>());
+    ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    Map<PartialPath, List<TimeValuePair>> compactedData =
+        CompactionCheckerUtils.getDataByQuery(
+            fullPaths,
+            iMeasurementSchemas,
+            Collections.singletonList(targetResource),
+            new ArrayList<>());
+    CompactionCheckerUtils.validDataByValueList(originData, compactedData);
+  }
 }