You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 10:04:05 UTC

[iotdb] branch LastOperator updated: add test

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/LastOperator by this push:
     new 3496d28b86 add test
3496d28b86 is described below

commit 3496d28b86843ec1ef3a82f4e07b41afec280f88
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 18:03:52 2022 +0800

    add test
---
 .../db/mpp/execution/operator/LastQueryUtil.java   |   1 +
 .../operator/LastCacheScanOperatorTest.java        |  91 +++++++++
 .../operator/LastQueryMergeOperatorTest.java       | 177 ++++++++++++++++++
 .../SeriesAggregationScanOperatorTest.java         |   2 +-
 .../operator/UpdateLastCacheOperatorTest.java      | 206 +++++++++++++++++++++
 5 files changed, 476 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
index 57f1f083a3..19e9571171 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -57,6 +57,7 @@ public class LastQueryUtil {
     builder.getColumnBuilder(1).writeBinary(new Binary(lastValue));
     // dataType
     builder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+    builder.declarePosition();
   }
 
   public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
new file mode 100644
index 0000000000..4c9bd25876
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LastCacheScanOperatorTest {
+
+  @Test
+  public void batchTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+
+
+      TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+      LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s1", "true", "BOOLEAN");
+      LastQueryUtil.appendLastValue(builder, 2, "root.sg.d.s2", "2", "INT32");
+      LastQueryUtil.appendLastValue(builder, 3, "root.sg.d.s3", "3", "INT64");
+      LastQueryUtil.appendLastValue(builder, 4, "root.sg.d.s4", "4.4", "FLOAT");
+      LastQueryUtil.appendLastValue(builder, 3, "root.sg.d.s5", "3.3", "DOUBLE");
+      LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s6", "peace", "TEXT");
+
+      TsBlock tsBlock = builder.build();
+
+      LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(fragmentInstanceContext.getOperatorContexts().get(0), planNodeId1, tsBlock);
+
+      assertTrue(lastCacheScanOperator.isBlocked().isDone());
+      assertTrue(lastCacheScanOperator.hasNext());
+      TsBlock result = lastCacheScanOperator.next();
+      assertEquals(tsBlock.getPositionCount(), result.getPositionCount());
+      assertEquals(tsBlock.getValueColumnCount(), result.getValueColumnCount());
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertEquals(tsBlock.getTimeByIndex(i), result.getTimeByIndex(i));
+        for (int j = 0; j < tsBlock.getValueColumnCount(); j++) {
+          assertEquals(tsBlock.getColumn(j).getBinary(i), result.getColumn(j).getBinary(i));
+        }
+      }
+      assertFalse(lastCacheScanOperator.hasNext());
+      assertTrue(lastCacheScanOperator.isFinished());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
new file mode 100644
index 0000000000..49f4e55a26
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class LastQueryMergeOperatorTest {
+
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.LastQueryMergeOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
+  }
+
+  @Test
+  public void testLastQueryMergeOperatorTestWithoutCachedValue() {
+    try {
+      List<Aggregator> aggregators1 = LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      List<Aggregator> aggregators2 = LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
+
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          new SeriesAggregationScanOperator(
+              planNodeId1,
+              measurementPath1,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              aggregators1,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator1.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator1 =  new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator1, measurementPath1, measurementPath1.getSeriesType(), null, false);
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          new SeriesAggregationScanOperator(
+              planNodeId3,
+              measurementPath2,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              aggregators2,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator2.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator2 =  new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(3), seriesAggregationScanOperator2, measurementPath2, measurementPath2.getSeriesType(), null, false);
+
+      LastQueryMergeOperator lastQueryMergeOperator = new LastQueryMergeOperator(fragmentInstanceContext.getOperatorContexts().get(4), ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
+
+      int count = 0;
+      while (!lastQueryMergeOperator.isFinished()) {
+        assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+        assertTrue(lastQueryMergeOperator.hasNext());
+        TsBlock result = lastQueryMergeOperator.next();
+        assertEquals(3, result.getValueColumnCount());
+
+        for (int i = 0; i < result.getPositionCount(); i++) {
+          assertEquals(499, result.getTimeByIndex(i));
+          assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count, result.getColumn(0).getBinary(i).toString());
+          assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+          assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(i).toString());
+          count++;
+        }
+      }
+
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithCachedValue() {
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 0c62a46328..c0c1adec30 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -479,7 +479,7 @@ public class SeriesAggregationScanOperatorTest {
         createFragmentInstanceContext(instanceId, stateMachine);
     PlanNodeId planNodeId = new PlanNodeId("1");
     fragmentInstanceContext.addOperatorContext(
-        1, planNodeId, SeriesScanOperator.class.getSimpleName());
+        1, planNodeId, SeriesAggregationScanOperator.class.getSimpleName());
 
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         new SeriesAggregationScanOperator(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
new file mode 100644
index 0000000000..a6af1d1f34
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import com.google.common.collect.Sets;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class UpdateLastCacheOperatorTest {
+
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.UpdateLastCacheOperator";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithoutTimeFilter() {
+    try {
+      List<Aggregator> aggregators = LastQueryUtil.createAggregators(TSDataType.INT32);
+      UpdateLastCacheOperator updateLastCacheOperator =
+          initUpdateLastCacheOperator(aggregators, null, false, null);
+
+      assertTrue(updateLastCacheOperator.isBlocked().isDone());
+      assertTrue(updateLastCacheOperator.hasNext());
+      TsBlock result = updateLastCacheOperator.next();
+      assertEquals(1, result.getPositionCount());
+      assertEquals(3, result.getValueColumnCount());
+
+      assertEquals(499, result.getTimeByIndex(0));
+      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals("10499", result.getColumn(1).getBinary(0).toString());
+      assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
+
+      assertFalse(updateLastCacheOperator.hasNext());
+      assertTrue(updateLastCacheOperator.isFinished());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithTimeFilter1() {
+    try {
+      List<Aggregator> aggregators = LastQueryUtil.createAggregators(TSDataType.INT32);
+      Filter timeFilter = TimeFilter.gtEq(200);
+      UpdateLastCacheOperator updateLastCacheOperator =
+          initUpdateLastCacheOperator(aggregators, timeFilter, false, null);
+
+      assertTrue(updateLastCacheOperator.isBlocked().isDone());
+      assertTrue(updateLastCacheOperator.hasNext());
+      TsBlock result = updateLastCacheOperator.next();
+      assertEquals(1, result.getPositionCount());
+      assertEquals(3, result.getValueColumnCount());
+
+      assertEquals(499, result.getTimeByIndex(0));
+      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals("10499", result.getColumn(1).getBinary(0).toString());
+      assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
+
+      assertFalse(updateLastCacheOperator.hasNext());
+      assertTrue(updateLastCacheOperator.isFinished());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void testUpdateLastCacheOperatorTestWithTimeFilter2() {
+    try {
+      List<Aggregator> aggregators = LastQueryUtil.createAggregators(TSDataType.INT32);
+      Filter timeFilter = TimeFilter.ltEq(120);
+      UpdateLastCacheOperator updateLastCacheOperator =
+          initUpdateLastCacheOperator(aggregators, timeFilter, false, null);
+
+      assertTrue(updateLastCacheOperator.isBlocked().isDone());
+      assertTrue(updateLastCacheOperator.hasNext());
+      TsBlock result = updateLastCacheOperator.next();
+      assertEquals(1, result.getPositionCount());
+      assertEquals(3, result.getValueColumnCount());
+
+      assertEquals(120, result.getTimeByIndex(0));
+      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals("20120", result.getColumn(1).getBinary(0).toString());
+      assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
+
+      assertFalse(updateLastCacheOperator.hasNext());
+      assertTrue(updateLastCacheOperator.isFinished());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  public UpdateLastCacheOperator initUpdateLastCacheOperator(
+      List<Aggregator> aggregators,
+      Filter timeFilter,
+      boolean ascending,
+      GroupByTimeParameter groupByTimeParameter)
+      throws IllegalPathException {
+    MeasurementPath measurementPath =
+        new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+    Set<String> allSensors = Sets.newHashSet("sensor0");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    fragmentInstanceContext.addOperatorContext(
+        1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    fragmentInstanceContext.addOperatorContext(
+        2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+    SeriesAggregationScanOperator seriesAggregationScanOperator =
+        new SeriesAggregationScanOperator(
+            planNodeId1,
+            measurementPath,
+            allSensors,
+            fragmentInstanceContext.getOperatorContexts().get(0),
+            aggregators,
+            timeFilter,
+            ascending,
+            groupByTimeParameter);
+    seriesAggregationScanOperator.initQueryDataSource(
+        new QueryDataSource(seqResources, unSeqResources));
+
+    return new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator, measurementPath, measurementPath.getSeriesType(), null, false);
+  }
+}