You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/02 03:40:40 UTC

[iotdb] branch ty-mpp updated: add DataDriverTest

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty-mpp by this push:
     new f3698d8  add DataDriverTest
f3698d8 is described below

commit f3698d899f8a511354ac142773ba0cc6a4423f3e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat Apr 2 11:39:47 2022 +0800

    add DataDriverTest
---
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |  67 ++++++++++++
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |  47 +++++----
 .../iotdb/db/mpp/execution/DriverContext.java      |   8 ++
 .../db/mpp/execution/FragmentInstanceContext.java  |  29 +++++-
 .../mpp/execution/FragmentInstanceExecution.java   |  24 +++--
 .../db/mpp/execution/FragmentInstanceManager.java  |  19 +++-
 .../iotdb/db/mpp/execution/SchemaDriver.java       |   8 +-
 .../org/apache/iotdb/db/mpp/operator/Operator.java |   1 +
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |   2 +-
 .../DataDriverTest.java}                           | 116 ++++++++++++++-------
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |   6 +-
 .../db/mpp/operator/SeriesScanOperatorTest.java    |   6 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |   6 +-
 ...est.java => FragmentInstanceSchedulerTest.java} |   2 +-
 14 files changed, 262 insertions(+), 79 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
new file mode 100644
index 0000000..b920eb0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+public class StubSinkHandle implements ISinkHandle {
+
+  private final ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
+
+  private final List<TsBlock> tsBlocks = new ArrayList<>();
+
+  @Override
+  public ListenableFuture<Void> isFull() {
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public void send(TsBlock tsBlock) {
+    tsBlocks.add(tsBlock);
+  }
+
+  @Override
+  public void send(int partition, TsBlock tsBlock) {
+    tsBlocks.add(tsBlock);
+  }
+
+  @Override
+  public void setNoMoreTsBlocks() {}
+
+  @Override
+  public void close() {
+    tsBlocks.clear();
+  }
+
+  @Override
+  public void abort() {
+    tsBlocks.clear();
+  }
+
+  public List<TsBlock> getTsBlocks() {
+    return tsBlocks;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 4d96322..0900fa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -57,7 +58,7 @@ public class DataDriver implements ExecFragmentInstance {
 
   private final Operator root;
   private final ISinkHandle sinkHandle;
-  private final DataDriverContext dataDriverContext;
+  private final DataDriverContext driverContext;
 
   private boolean init;
   private boolean closed;
@@ -69,10 +70,12 @@ public class DataDriver implements ExecFragmentInstance {
 
   private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
 
-  public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext dataDriverContext) {
+  public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
     this.root = root;
     this.sinkHandle = sinkHandle;
-    this.dataDriverContext = dataDriverContext;
+    this.driverContext = driverContext;
+    this.closedFilePaths = new HashSet<>();
+    this.unClosedFilePaths = new HashSet<>();
     // initially the driverBlockedFuture is not blocked (it is completed)
     SettableFuture<Void> future = SettableFuture.create();
     future.set(null);
@@ -81,14 +84,17 @@ public class DataDriver implements ExecFragmentInstance {
 
   @Override
   public boolean isFinished() {
-    if (closed) {
-      return true;
-    }
     try {
-      return root != null && root.isFinished();
+      boolean isFinished =
+          closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
+      if (isFinished) {
+        driverContext.finish();
+      }
+      return isFinished;
     } catch (Throwable t) {
       logger.error(
-          "Failed to query whether the data driver {} is finished", dataDriverContext.getId(), t);
+          "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
+      driverContext.failed(t);
       close();
       return true;
     }
@@ -105,9 +111,8 @@ public class DataDriver implements ExecFragmentInstance {
         initialize();
       } catch (Throwable t) {
         logger.error(
-            "Failed to do the initialization for fragment instance {} ",
-            dataDriverContext.getId(),
-            t);
+            "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
+        driverContext.failed(t);
         close();
         return NOT_BLOCKED;
       }
@@ -130,7 +135,8 @@ public class DataDriver implements ExecFragmentInstance {
         }
       } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
     } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", dataDriverContext.getId(), t);
+      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+      driverContext.failed(t);
       close();
     }
     return NOT_BLOCKED;
@@ -138,7 +144,7 @@ public class DataDriver implements ExecFragmentInstance {
 
   @Override
   public FragmentInstanceId getInfo() {
-    return dataDriverContext.getId();
+    return driverContext.getId();
   }
 
   @Override
@@ -159,7 +165,8 @@ public class DataDriver implements ExecFragmentInstance {
         sinkHandle.close();
       }
     } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", dataDriverContext.getId(), t);
+      logger.error("Failed to closed driver {}", driverContext.getId(), t);
+      driverContext.failed(t);
     } finally {
       removeUsedFilesForQuery();
     }
@@ -170,7 +177,7 @@ public class DataDriver implements ExecFragmentInstance {
    * we should change all the blocked lock operation into tryLock
    */
   private void initialize() throws QueryProcessException {
-    List<SourceOperator> sourceOperators = dataDriverContext.getSourceOperators();
+    List<SourceOperator> sourceOperators = driverContext.getSourceOperators();
     if (sourceOperators != null && !sourceOperators.isEmpty()) {
       QueryDataSource dataSource = initQueryDataSourceCache();
       sourceOperators.forEach(
@@ -193,11 +200,11 @@ public class DataDriver implements ExecFragmentInstance {
    * QueryDataSource needed for this query
    */
   public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
-    VirtualStorageGroupProcessor dataRegion = dataDriverContext.getDataRegion();
+    VirtualStorageGroupProcessor dataRegion = driverContext.getDataRegion();
     dataRegion.readLock();
     try {
       List<PartialPath> pathList =
-          dataDriverContext.getPaths().stream()
+          driverContext.getPaths().stream()
               .map(IDTable::translateQueryPath)
               .collect(Collectors.toList());
       // when all the selected series are under the same device, the QueryDataSource will be
@@ -209,8 +216,8 @@ public class DataDriver implements ExecFragmentInstance {
           dataRegion.query(
               pathList,
               selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
-              dataDriverContext.getFragmentInstanceContext(),
-              dataDriverContext.getTimeFilter());
+              driverContext.getFragmentInstanceContext(),
+              driverContext.getTimeFilter());
 
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
@@ -218,7 +225,7 @@ public class DataDriver implements ExecFragmentInstance {
 
       return dataSource;
     } finally {
-      dataDriverContext.getDataRegion().readUnlock();
+      dataRegion.readUnlock();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index fddc3a5..2168c24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -35,4 +35,12 @@ public class DriverContext {
   public FragmentInstanceContext getFragmentInstanceContext() {
     return fragmentInstanceContext;
   }
+
+  public void failed(Throwable cause) {
+    fragmentInstanceContext.failed(cause);
+  }
+
+  public void finish() {
+    fragmentInstanceContext.finish();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index fc3c82d..550a293 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -25,12 +25,13 @@ import org.apache.iotdb.db.query.context.QueryContext;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
 public class FragmentInstanceContext extends QueryContext {
 
-  private FragmentInstanceId id;
+  private final FragmentInstanceId id;
 
   // TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
   // with CopyOnWriteArrayList or some other thread safe data structure
@@ -39,6 +40,9 @@ public class FragmentInstanceContext extends QueryContext {
 
   private DriverContext driverContext;
 
+  // TODO we may use StateMachine<FragmentInstanceState> to replace it
+  private final AtomicReference<FragmentInstanceState> state;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -47,8 +51,10 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcCount = new AtomicLong(-1);
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
-  public FragmentInstanceContext(FragmentInstanceId id) {
+  public FragmentInstanceContext(
+      FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
     this.id = id;
+    this.state = state;
   }
 
   public OperatorContext addOperatorContext(
@@ -83,4 +89,23 @@ public class FragmentInstanceContext extends QueryContext {
   public void setDriverContext(DriverContext driverContext) {
     this.driverContext = driverContext;
   }
+
+  public void failed(Throwable cause) {
+    state.set(FragmentInstanceState.FAILED);
+  }
+
+  public void cancel() {
+    state.set(FragmentInstanceState.CANCELED);
+  }
+
+  public void abort() {
+    state.set(FragmentInstanceState.ABORTED);
+  }
+
+  public void finish() {
+    if (state.get().isDone()) {
+      return;
+    }
+    state.set(FragmentInstanceState.FINISHED);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 1791e3f..4bfb458 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
 
 import com.google.common.collect.ImmutableList;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 import static java.util.Objects.requireNonNull;
 
 public class FragmentInstanceExecution {
@@ -34,7 +36,8 @@ public class FragmentInstanceExecution {
 
   private final ExecFragmentInstance driver;
 
-  private FragmentInstanceState state;
+  // TODO we may use StateMachine<FragmentInstanceState> to replace it
+  private final AtomicReference<FragmentInstanceState> state;
 
   private long lastHeartbeat;
 
@@ -42,11 +45,14 @@ public class FragmentInstanceExecution {
       IFragmentInstanceScheduler scheduler,
       FragmentInstanceId instanceId,
       FragmentInstanceContext context,
-      ExecFragmentInstance driver) {
+      ExecFragmentInstance driver,
+      AtomicReference<FragmentInstanceState> state) {
     this.scheduler = scheduler;
     this.instanceId = instanceId;
     this.context = context;
     this.driver = driver;
+    this.state = state;
+    state.set(FragmentInstanceState.RUNNING);
     scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
   }
 
@@ -59,27 +65,23 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceState getInstanceState() {
-    return state;
-  }
-
-  public void setState(FragmentInstanceState state) {
-    this.state = state;
+    return state.get();
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(state);
+    return new FragmentInstanceInfo(state.get());
   }
 
   public void failed(Throwable cause) {
     requireNonNull(cause, "cause is null");
-    // TODO
+    context.failed(cause);
   }
 
   public void cancel() {
-    // TODO
+    context.cancel();
   }
 
   public void abort() {
-    // TODO
+    context.abort();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 91a31c6..4e9c61b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Objects.requireNonNull;
 
@@ -55,8 +56,13 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
+              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
+              state.set(FragmentInstanceState.PLANNED);
+
               FragmentInstanceContext context =
-                  instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
+                  instanceContext.computeIfAbsent(
+                      instanceId,
+                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
 
               DataDriver driver =
                   planner.plan(
@@ -64,7 +70,7 @@ public class FragmentInstanceManager {
                       context,
                       instance.getTimeFilter(),
                       dataRegion);
-              return new FragmentInstanceExecution(scheduler, instanceId, context, driver);
+              return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
             });
 
     return execution.getInstanceInfo();
@@ -78,12 +84,17 @@ public class FragmentInstanceManager {
         instanceExecution.computeIfAbsent(
             instanceId,
             id -> {
+              AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
+              state.set(FragmentInstanceState.PLANNED);
+
               FragmentInstanceContext context =
-                  instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
+                  instanceContext.computeIfAbsent(
+                      instanceId,
+                      fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
 
               SchemaDriver driver =
                   planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
-              return new FragmentInstanceExecution(scheduler, instanceId, context, driver);
+              return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
             });
 
     return execution.getInstanceInfo();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index efb5484..844381b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -62,10 +62,15 @@ public class SchemaDriver implements ExecFragmentInstance {
   @Override
   public boolean isFinished() {
     try {
-      return root != null && root.isFinished();
+      boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
+      if (isFinished) {
+        driverContext.finish();
+      }
+      return isFinished;
     } catch (Throwable t) {
       logger.error(
           "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
+      driverContext.failed(t);
       return true;
     }
   }
@@ -90,6 +95,7 @@ public class SchemaDriver implements ExecFragmentInstance {
       } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
     } catch (Throwable t) {
       logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+      driverContext.failed(t);
       close();
     }
     return NOT_BLOCKED;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index 0821424..af18fb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 
 public interface Operator extends AutoCloseable {
+
   ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
 
   OperatorContext getOperatorContext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index db9cee9..7f0ac66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
 /** the worker thread of {@link FragmentInstanceTask} */
 public class FragmentInstanceTaskExecutor extends AbstractExecutor {
 
-  private static final Duration EXECUTION_TIME_SLICE = new Duration(100, TimeUnit.MILLISECONDS);
+  public static final Duration EXECUTION_TIME_SLICE = new Duration(100, TimeUnit.MILLISECONDS);
 
   // As the callback is lightweight enough, there's no need to use another one thread to execute.
   private static final Executor listeningExecutor = MoreExecutors.directExecutor();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
similarity index 57%
copy from server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
copy to server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index e901ab6..c5f621b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -16,17 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.operator;
+package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.buffer.StubSinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -39,9 +42,12 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -49,14 +55,17 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class LimitOperatorTest {
+public class DataDriverTest {
 
-  private static final String TIME_JOIN_OPERATOR_TEST_SG = "root.LimitOperatorTest";
+  private static final String DATA_DRIVER_TEST_SG = "root.DataDriverTest";
   private final List<String> deviceIds = new ArrayList<>();
   private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
 
@@ -66,7 +75,7 @@ public class LimitOperatorTest {
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     SeriesReaderTestUtil.setUp(
-        measurementSchemas, deviceIds, seqResources, unSeqResources, TIME_JOIN_OPERATOR_TEST_SG);
+        measurementSchemas, deviceIds, seqResources, unSeqResources, DATA_DRIVER_TEST_SG);
   }
 
   @After
@@ -78,14 +87,16 @@ public class LimitOperatorTest {
   public void batchTest() {
     try {
       MeasurementPath measurementPath1 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+          new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
       Set<String> allSensors = new HashSet<>();
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
+      AtomicReference<FragmentInstanceState> state =
+          new AtomicReference<>(FragmentInstanceState.RUNNING);
       FragmentInstanceContext fragmentInstanceContext =
           new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
       fragmentInstanceContext.addOperatorContext(
           1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
       fragmentInstanceContext.addOperatorContext(
@@ -103,10 +114,9 @@ public class LimitOperatorTest {
               null,
               null,
               true);
-      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
 
       MeasurementPath measurementPath2 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+          new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor1", TSDataType.INT32);
       SeriesScanOperator seriesScanOperator2 =
           new SeriesScanOperator(
               measurementPath2,
@@ -116,7 +126,6 @@ public class LimitOperatorTest {
               null,
               null,
               true);
-      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
 
       TimeJoinOperator timeJoinOperator =
           new TimeJoinOperator(
@@ -128,37 +137,72 @@ public class LimitOperatorTest {
       LimitOperator limitOperator =
           new LimitOperator(
               fragmentInstanceContext.getOperatorContexts().get(3), 250, timeJoinOperator);
-      int count = 0;
-      while (limitOperator.hasNext()) {
-        TsBlock tsBlock = limitOperator.next();
-        assertEquals(2, tsBlock.getValueColumnCount());
-        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
-        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
-        if (count < 12) {
-          assertEquals(20, tsBlock.getPositionCount());
-        } else {
-          assertEquals(10, tsBlock.getPositionCount());
+
+      VirtualStorageGroupProcessor dataRegion = Mockito.mock(VirtualStorageGroupProcessor.class);
+
+      List<PartialPath> pathList = ImmutableList.of(measurementPath1, measurementPath2);
+      String deviceId = DATA_DRIVER_TEST_SG + ".device0";
+
+      Mockito.when(dataRegion.query(pathList, deviceId, fragmentInstanceContext, null))
+          .thenReturn(new QueryDataSource(seqResources, unSeqResources));
+
+      DataDriverContext driverContext =
+          new DataDriverContext(
+              fragmentInstanceContext,
+              pathList,
+              null,
+              dataRegion,
+              ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
+
+      StubSinkHandle sinkHandle = new StubSinkHandle();
+
+      try (ExecFragmentInstance dataDriver =
+          new DataDriver(limitOperator, sinkHandle, driverContext)) {
+        assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
+
+        assertFalse(dataDriver.isFinished());
+
+        while (!dataDriver.isFinished()) {
+          assertEquals(FragmentInstanceState.RUNNING, state.get());
+          ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE);
+          assertTrue(blocked.isDone());
         }
-        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-          long expectedTime = i + 20L * count;
-          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
-          if (expectedTime < 200) {
-            assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
-            assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
-          } else if (expectedTime < 260
-              || (expectedTime >= 300 && expectedTime < 380)
-              || expectedTime >= 400) {
-            assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
-            assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+
+        assertEquals(FragmentInstanceState.FINISHED, state.get());
+
+        List<TsBlock> result = sinkHandle.getTsBlocks();
+        assertEquals(13, result.size());
+
+        for (int i = 0; i < 13; i++) {
+          TsBlock tsBlock = result.get(i);
+          assertEquals(2, tsBlock.getValueColumnCount());
+          assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+          assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+
+          if (i < 12) {
+            assertEquals(20, tsBlock.getPositionCount());
           } else {
-            assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
-            assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+            assertEquals(10, tsBlock.getPositionCount());
+          }
+          for (int j = 0; j < tsBlock.getPositionCount(); j++) {
+            long expectedTime = j + 20L * i;
+            assertEquals(expectedTime, tsBlock.getTimeByIndex(j));
+            if (expectedTime < 200) {
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(j));
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(j));
+            } else if (expectedTime < 260
+                || (expectedTime >= 300 && expectedTime < 380)
+                || expectedTime >= 400) {
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(j));
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(j));
+            } else {
+              assertEquals(expectedTime, tsBlock.getColumn(0).getInt(j));
+              assertEquals(expectedTime, tsBlock.getColumn(1).getInt(j));
+            }
           }
         }
-        count++;
       }
-      assertEquals(13, count);
-    } catch (IOException | IllegalPathException e) {
+    } catch (IllegalPathException | QueryProcessException e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index e901ab6..f1e71d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -49,6 +50,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -83,9 +85,11 @@ public class LimitOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
+      AtomicReference<FragmentInstanceState> state =
+          new AtomicReference<>(FragmentInstanceState.RUNNING);
       FragmentInstanceContext fragmentInstanceContext =
           new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
       fragmentInstanceContext.addOperatorContext(
           1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
       fragmentInstanceContext.addOperatorContext(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 1fd496e..db5b568 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -45,6 +46,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -77,9 +79,11 @@ public class SeriesScanOperatorTest {
       Set<String> allSensors = new HashSet<>();
       allSensors.add("sensor0");
       QueryId queryId = new QueryId("stub_query");
+      AtomicReference<FragmentInstanceState> state =
+          new AtomicReference<>(FragmentInstanceState.RUNNING);
       FragmentInstanceContext fragmentInstanceContext =
           new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
       fragmentInstanceContext.addOperatorContext(
           1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
       SeriesScanOperator seriesScanOperator =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index a22554f..2e49975 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -48,6 +49,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -79,9 +81,11 @@ public class TimeJoinOperatorTest {
       allSensors.add("sensor0");
       allSensors.add("sensor1");
       QueryId queryId = new QueryId("stub_query");
+      AtomicReference<FragmentInstanceState> state =
+          new AtomicReference<>(FragmentInstanceState.RUNNING);
       FragmentInstanceContext fragmentInstanceContext =
           new FragmentInstanceContext(
-              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
       fragmentInstanceContext.addOperatorContext(
           1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
       fragmentInstanceContext.addOperatorContext(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index c79d376..7ecf8d1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -36,7 +36,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-public class FragmentInstanceManagerTest {
+public class FragmentInstanceSchedulerTest {
 
   private final FragmentInstanceScheduler manager = FragmentInstanceScheduler.getInstance();