You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 10:50:10 UTC

[iotdb] 02/02: fix ut

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c275c9b1db8e8229715e48082cd24683ae787fde
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 18:49:59 2022 +0800

    fix ut
---
 .../operator/LastCacheScanOperatorTest.java        |   6 +-
 .../operator/LastQueryMergeOperatorTest.java       | 166 ++++++++++++++++++++-
 .../SeriesAggregationScanOperatorTest.java         |   1 -
 .../operator/UpdateLastCacheOperatorTest.java      |  23 ++-
 4 files changed, 180 insertions(+), 16 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
index 4c9bd25876..72ee86fb65 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
 import org.junit.Test;
 
 import java.util.concurrent.ExecutorService;
@@ -56,7 +57,6 @@ public class LastCacheScanOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
 
-
       TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
 
       LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s1", "true", "BOOLEAN");
@@ -68,7 +68,9 @@ public class LastCacheScanOperatorTest {
 
       TsBlock tsBlock = builder.build();
 
-      LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(fragmentInstanceContext.getOperatorContexts().get(0), planNodeId1, tsBlock);
+      LastCacheScanOperator lastCacheScanOperator =
+          new LastCacheScanOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0), planNodeId1, tsBlock);
 
       assertTrue(lastCacheScanOperator.isBlocked().isDone());
       assertTrue(lastCacheScanOperator.hasNext());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index 49f4e55a26..03fa1a1584 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -34,13 +32,18 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -115,7 +118,6 @@ public class LastQueryMergeOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
 
-
       SeriesAggregationScanOperator seriesAggregationScanOperator1 =
           new SeriesAggregationScanOperator(
               planNodeId1,
@@ -129,7 +131,14 @@ public class LastQueryMergeOperatorTest {
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
-      UpdateLastCacheOperator updateLastCacheOperator1 =  new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator1, measurementPath1, measurementPath1.getSeriesType(), null, false);
+      UpdateLastCacheOperator updateLastCacheOperator1 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              seriesAggregationScanOperator1,
+              measurementPath1,
+              measurementPath1.getSeriesType(),
+              null,
+              false);
 
       SeriesAggregationScanOperator seriesAggregationScanOperator2 =
           new SeriesAggregationScanOperator(
@@ -144,27 +153,41 @@ public class LastQueryMergeOperatorTest {
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
-      UpdateLastCacheOperator updateLastCacheOperator2 =  new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(3), seriesAggregationScanOperator2, measurementPath2, measurementPath2.getSeriesType(), null, false);
+      UpdateLastCacheOperator updateLastCacheOperator2 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              seriesAggregationScanOperator2,
+              measurementPath2,
+              measurementPath2.getSeriesType(),
+              null,
+              false);
 
-      LastQueryMergeOperator lastQueryMergeOperator = new LastQueryMergeOperator(fragmentInstanceContext.getOperatorContexts().get(4), ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
+      LastQueryMergeOperator lastQueryMergeOperator =
+          new LastQueryMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
 
       int count = 0;
       while (!lastQueryMergeOperator.isFinished()) {
         assertTrue(lastQueryMergeOperator.isBlocked().isDone());
         assertTrue(lastQueryMergeOperator.hasNext());
         TsBlock result = lastQueryMergeOperator.next();
+        if (result == null) {
+          continue;
+        }
         assertEquals(3, result.getValueColumnCount());
 
         for (int i = 0; i < result.getPositionCount(); i++) {
           assertEquals(499, result.getTimeByIndex(i));
-          assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count, result.getColumn(0).getBinary(i).toString());
+          assertEquals(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+              result.getColumn(0).getBinary(i).toString());
           assertEquals("10499", result.getColumn(1).getBinary(i).toString());
           assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(i).toString());
           count++;
         }
       }
 
-
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -173,5 +196,132 @@ public class LastQueryMergeOperatorTest {
 
   @Test
   public void testUpdateLastCacheOperatorTestWithCachedValue() {
+    try {
+      List<Aggregator> aggregators1 = LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      List<Aggregator> aggregators2 = LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId4, LastCacheScanOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, LastQueryMergeOperator.class.getSimpleName());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          new SeriesAggregationScanOperator(
+              planNodeId1,
+              measurementPath1,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              aggregators1,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator1.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator1 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              seriesAggregationScanOperator1,
+              measurementPath1,
+              measurementPath1.getSeriesType(),
+              null,
+              false);
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          new SeriesAggregationScanOperator(
+              planNodeId3,
+              measurementPath2,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              aggregators2,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator2.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator2 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              seriesAggregationScanOperator2,
+              measurementPath2,
+              measurementPath2.getSeriesType(),
+              null,
+              false);
+
+      TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor2", "10499", "INT32");
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor3", "10499", "INT32");
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor4", "10499", "INT32");
+
+      TsBlock tsBlock = builder.build();
+
+      LastCacheScanOperator lastCacheScanOperator =
+          new LastCacheScanOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4), planNodeId5, tsBlock);
+
+      LastQueryMergeOperator lastQueryMergeOperator =
+          new LastQueryMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              ImmutableList.of(
+                  updateLastCacheOperator1, updateLastCacheOperator2, lastCacheScanOperator));
+
+      int count = 0;
+      while (!lastQueryMergeOperator.isFinished()) {
+        assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+        assertTrue(lastQueryMergeOperator.hasNext());
+        TsBlock result = lastQueryMergeOperator.next();
+        if (result == null) {
+          continue;
+        }
+        assertEquals(3, result.getValueColumnCount());
+
+        for (int i = 0; i < result.getPositionCount(); i++) {
+          assertEquals(499, result.getTimeByIndex(i));
+          assertEquals(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+              result.getColumn(0).getBinary(i).toString());
+          assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+          assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(i).toString());
+          count++;
+        }
+      }
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index c0c1adec30..7e95c4ac2c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index a6af1d1f34..b78cb7b1ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import com.google.common.collect.Sets;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -42,6 +41,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -96,7 +97,9 @@ public class UpdateLastCacheOperatorTest {
       assertEquals(3, result.getValueColumnCount());
 
       assertEquals(499, result.getTimeByIndex(0));
-      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
       assertEquals("10499", result.getColumn(1).getBinary(0).toString());
       assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
 
@@ -124,7 +127,9 @@ public class UpdateLastCacheOperatorTest {
       assertEquals(3, result.getValueColumnCount());
 
       assertEquals(499, result.getTimeByIndex(0));
-      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
       assertEquals("10499", result.getColumn(1).getBinary(0).toString());
       assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
 
@@ -152,7 +157,9 @@ public class UpdateLastCacheOperatorTest {
       assertEquals(3, result.getValueColumnCount());
 
       assertEquals(120, result.getTimeByIndex(0));
-      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
       assertEquals("20120", result.getColumn(1).getBinary(0).toString());
       assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
 
@@ -201,6 +208,12 @@ public class UpdateLastCacheOperatorTest {
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
 
-    return new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator, measurementPath, measurementPath.getSeriesType(), null, false);
+    return new UpdateLastCacheOperator(
+        fragmentInstanceContext.getOperatorContexts().get(1),
+        seriesAggregationScanOperator,
+        measurementPath,
+        measurementPath.getSeriesType(),
+        null,
+        false);
   }
 }