You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/07 00:51:43 UTC

[iotdb] branch rel/0.12 updated: [IOTDB-3762][To rel/0.12]add test of cross compaction (#6600)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 369aa369e8 [IOTDB-3762][To rel/0.12]add test of cross compaction (#6600)
369aa369e8 is described below

commit 369aa369e81c4f10d726b6be77178a6725ab9fe0
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Thu Jul 7 08:51:38 2022 +0800

    [IOTDB-3762][To rel/0.12]add test of cross compaction (#6600)
---
 .../iotdb/db/engine/merge/MergeNewSeriesTest.java  | 210 ++++++++++++++++++++-
 1 file changed, 207 insertions(+), 3 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java
index 4b1e8942a7..57273a890e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java
@@ -25,11 +25,17 @@ import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -47,12 +53,9 @@ public class MergeNewSeriesTest extends MergeTest {
 
   @Override
   public void setUp() throws IOException, WriteProcessException, MetadataException {
-    measurementNum = 1;
     deviceNum = 2;
     seqFileNum = 2;
-    // unseq files are manually prepared because they will have new time series
     unseqFileNum = 0;
-    super.setUp();
   }
 
   @Override
@@ -81,6 +84,8 @@ public class MergeNewSeriesTest extends MergeTest {
 
   @Test
   public void testFullMerge() throws Exception {
+    measurementNum = 1;
+    super.setUp();
     MergeTask mergeTask =
         new MergeTask(
             new MergeResource(seqResources, unseqResources),
@@ -112,4 +117,203 @@ public class MergeNewSeriesTest extends MergeTest {
     queryAndCheck(
         deviceIds[1], unseqSchemas[0], resources, checkResultFunc(0), checkResultCntFunc(0));
   }
+
+  /**
+   * seq file1: d0.s0[0,99], d1.s0[0,99]<br>
+   * seq file2: d0.s0[100,199], d1.s0[100,199], d1.s2[100,199]<br>
+   * unseq file1: d0.s1[0,99]<br>
+   * unseq file2: d1.s1[100,199], d1.s2[50,150]<br>
+   * target file1: d0.s0[0,99], d0.s1[0,99], d1.s0[0,99], d1.s2[50,99]<br>
+   * target file2: d0.s0[100,199], d1.s0[100,199], d1.s1[100,199], d1.s2[100,199]
+   *
+   * <p>d0.s1 and d1.s1 is in unseq file but not in seq files, and d1.s2 is not in the first seq
+   * file.
+   */
+  @Test
+  public void testNewSeriesInUnseqFiles() throws Exception {
+    measurementNum = 3;
+    super.setUp();
+    List<TsFileResource> seqTsFileResources = new ArrayList<>();
+    List<TsFileResource> unseqTsFileResources = new ArrayList<>();
+    TsFileResource seq1 = prepareResource(0);
+    seqTsFileResources.add(seq1);
+    TsFileResource seq2 = prepareResource(1);
+    seqTsFileResources.add(seq2);
+    MeasurementSchema[] seqSchemas = new MeasurementSchema[1];
+    seqSchemas[0] = toMeasurementSchema(0);
+    // prepare seq file1
+    prepareFile(seq1, 0, 100, 0, Arrays.copyOfRange(deviceIds, 0, 2), seqSchemas);
+
+    // prepare seq file2
+    TsFileWriter fileWriter = new TsFileWriter(seq2.getTsFile());
+    for (String deviceId : Arrays.copyOfRange(deviceIds, 0, 2)) {
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
+        fileWriter.registerTimeseries(
+            new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
+      }
+    }
+    for (long i = 100; i < 200; i++) {
+      for (int deviceIndex = 0; deviceIndex < deviceIds.length; deviceIndex++) {
+        String deviceId = deviceIds[deviceIndex];
+        TSRecord record = new TSRecord(i, deviceId);
+        if (deviceIndex == 0) {
+          record.addTuple(
+              DataPoint.getDataPoint(
+                  measurementSchemas[0].getType(),
+                  measurementSchemas[0].getMeasurementId(),
+                  String.valueOf(i)));
+        } else {
+          record.addTuple(
+              DataPoint.getDataPoint(
+                  measurementSchemas[0].getType(),
+                  measurementSchemas[0].getMeasurementId(),
+                  String.valueOf(i)));
+          record.addTuple(
+              DataPoint.getDataPoint(
+                  measurementSchemas[2].getType(),
+                  measurementSchemas[2].getMeasurementId(),
+                  String.valueOf(i)));
+        }
+        fileWriter.write(record);
+        seq2.updateStartTime(deviceId, i);
+        seq2.updateEndTime(deviceId, i);
+      }
+      if ((i + 1) % flushInterval == 0) {
+        fileWriter.flushAllChunkGroups();
+      }
+    }
+    fileWriter.close();
+
+    TsFileResource unseq1 = prepareResource(2);
+    unseqTsFileResources.add(unseq1);
+    TsFileResource unseq2 = prepareResource(3);
+    unseqTsFileResources.add(unseq2);
+
+    // prepare unseq file1
+    prepareFile(unseq1, 0, 100, 0, Arrays.copyOfRange(deviceIds, 0, 1), unseqSchemas);
+
+    // prepare unseq file2
+    fileWriter = new TsFileWriter(unseq2.getTsFile());
+    for (String deviceId : Arrays.copyOfRange(deviceIds, 1, 2)) {
+      for (MeasurementSchema measurementSchema : Arrays.copyOfRange(measurementSchemas, 1, 3)) {
+        fileWriter.registerTimeseries(
+            new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
+      }
+    }
+    for (long i = 100; i < 200; i++) {
+      for (String deviceId : Arrays.copyOfRange(deviceIds, 1, 2)) {
+        TSRecord record = new TSRecord(i, deviceId);
+        record.addTuple(
+            DataPoint.getDataPoint(
+                measurementSchemas[1].getType(),
+                measurementSchemas[1].getMeasurementId(),
+                String.valueOf(i)));
+        fileWriter.write(record);
+        unseq2.updateStartTime(deviceId, i);
+        unseq2.updateEndTime(deviceId, i);
+      }
+      if ((i + 1) % flushInterval == 0) {
+        fileWriter.flushAllChunkGroups();
+      }
+    }
+    for (long i = 50; i < 151; i++) {
+      for (String deviceId : Arrays.copyOfRange(deviceIds, 1, 2)) {
+        TSRecord record = new TSRecord(i, deviceId);
+        record.addTuple(
+            DataPoint.getDataPoint(
+                measurementSchemas[2].getType(),
+                measurementSchemas[2].getMeasurementId(),
+                String.valueOf(i)));
+        fileWriter.write(record);
+        unseq2.updateStartTime(deviceId, i);
+        unseq2.updateEndTime(deviceId, i);
+      }
+      if ((i + 1) % flushInterval == 0) {
+        fileWriter.flushAllChunkGroups();
+      }
+    }
+    fileWriter.close();
+
+    MergeTask mergeTask =
+        new MergeTask(
+            new MergeResource(seqTsFileResources, unseqTsFileResources),
+            TestConstant.OUTPUT_DATA_DIR,
+            (k, v, l) -> {},
+            "test",
+            true,
+            1,
+            MERGE_TEST_SG);
+    mergeTask.call();
+
+    Assert.assertEquals(0, seqTsFileResources.get(0).getStartTime(deviceIds[0]));
+    Assert.assertEquals(99, seqTsFileResources.get(0).getEndTime(deviceIds[0]));
+    Assert.assertEquals(0, seqTsFileResources.get(0).getStartTime(deviceIds[1]));
+    Assert.assertEquals(99, seqTsFileResources.get(0).getEndTime(deviceIds[1]));
+    Assert.assertEquals(100, seqTsFileResources.get(1).getStartTime(deviceIds[0]));
+    Assert.assertEquals(199, seqTsFileResources.get(1).getEndTime(deviceIds[0]));
+    Assert.assertEquals(100, seqTsFileResources.get(1).getStartTime(deviceIds[1]));
+    Assert.assertEquals(199, seqTsFileResources.get(1).getEndTime(deviceIds[1]));
+
+    // query root.MergeTest.d0.s0 from target1
+    List<TsFileResource> resources = Collections.singletonList(seqTsFileResources.get(0));
+    queryAndCheck(
+        deviceIds[0],
+        measurementSchemas[0],
+        resources,
+        checkResultFunc(0),
+        checkResultCntFunc(100));
+
+    // query root.MergeTest.d0.s1 from target1
+    queryAndCheck(
+        deviceIds[0],
+        measurementSchemas[1],
+        resources,
+        checkResultFunc(0),
+        checkResultCntFunc(100));
+
+    // query root.MergeTest.d1.s0 from target1
+    queryAndCheck(
+        deviceIds[1],
+        measurementSchemas[0],
+        resources,
+        checkResultFunc(0),
+        checkResultCntFunc(100));
+
+    // query root.MergeTest.d1.s2 from target1
+    queryAndCheck(
+        deviceIds[1], measurementSchemas[2], resources, checkResultFunc(0), checkResultCntFunc(50));
+
+    // query root.MergeTest.d0.s0 from target2
+    resources = Collections.singletonList(seqTsFileResources.get(1));
+    queryAndCheck(
+        deviceIds[0],
+        measurementSchemas[0],
+        resources,
+        checkResultFunc(0),
+        checkResultCntFunc(100));
+
+    // query root.MergeTest.d1.s0 from target2
+    queryAndCheck(
+        deviceIds[1],
+        measurementSchemas[0],
+        resources,
+        checkResultFunc(0),
+        checkResultCntFunc(100));
+
+    // query root.MergeTest.d1.s1 from target2
+    queryAndCheck(
+        deviceIds[1],
+        measurementSchemas[1],
+        resources,
+        checkResultFunc(0),
+        checkResultCntFunc(100));
+
+    // query root.MergeTest.d1.s2 from target2
+    queryAndCheck(
+        deviceIds[1],
+        measurementSchemas[2],
+        resources,
+        checkResultFunc(0),
+        checkResultCntFunc(100));
+  }
 }