You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/01 09:07:59 UTC

[iotdb] branch ty-mpp updated: add SchemaDriver

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty-mpp by this push:
     new 552f893  add SchemaDriver
552f893 is described below

commit 552f89372b378026322a932d58e02b0718bb037f
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 1 17:04:24 2022 +0800

    add SchemaDriver
---
 .../mpp/execution/{Driver.java => DataDriver.java} |  35 +++---
 .../{DriverContext.java => DataDriverContext.java} |  16 +--
 .../iotdb/db/mpp/execution/DriverContext.java      |  38 +-----
 .../mpp/execution/FragmentInstanceExecution.java   |   4 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |  22 +++-
 .../iotdb/db/mpp/execution/SchemaDriver.java       | 136 +++++++++++++++++++++
 .../db/mpp/execution/SchemaDriverContext.java      |  36 ++++++
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  34 ++++--
 8 files changed, 245 insertions(+), 76 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 91623a4..b1e06ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -51,13 +51,13 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
 @NotThreadSafe
-public class Driver implements ExecFragmentInstance {
+public class DataDriver implements ExecFragmentInstance {
 
-  private static final Logger logger = LoggerFactory.getLogger(Driver.class);
+  private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
 
   private final Operator root;
   private final ISinkHandle sinkHandle;
-  private final DriverContext driverContext;
+  private final DataDriverContext dataDriverContext;
 
   private boolean init;
   private boolean closed;
@@ -69,10 +69,10 @@ public class Driver implements ExecFragmentInstance {
 
   private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
 
-  public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
+  public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext dataDriverContext) {
     this.root = root;
     this.sinkHandle = sinkHandle;
-    this.driverContext = driverContext;
+    this.dataDriverContext = dataDriverContext;
   }
 
   @Override
@@ -83,7 +83,8 @@ public class Driver implements ExecFragmentInstance {
     try {
       return root != null && root.isFinished();
     } catch (Throwable t) {
-      logger.error("Failed to query whether the driver {} is finished", driverContext.getId(), t);
+      logger.error(
+          "Failed to query whether the data driver {} is finished", dataDriverContext.getId(), t);
       close();
       return true;
     }
@@ -100,7 +101,9 @@ public class Driver implements ExecFragmentInstance {
         initialize();
       } catch (Throwable t) {
         logger.error(
-            "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
+            "Failed to do the initialization for fragment instance {} ",
+            dataDriverContext.getId(),
+            t);
         close();
         return NOT_BLOCKED;
       }
@@ -123,7 +126,7 @@ public class Driver implements ExecFragmentInstance {
         }
       } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
     } catch (Throwable t) {
-      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+      logger.error("Failed to execute fragment instance {}", dataDriverContext.getId(), t);
       close();
     }
     return NOT_BLOCKED;
@@ -131,7 +134,7 @@ public class Driver implements ExecFragmentInstance {
 
   @Override
   public FragmentInstanceId getInfo() {
-    return driverContext.getId();
+    return dataDriverContext.getId();
   }
 
   @Override
@@ -152,7 +155,7 @@ public class Driver implements ExecFragmentInstance {
         sinkHandle.close();
       }
     } catch (Throwable t) {
-      logger.error("Failed to closed driver {}", driverContext.getId(), t);
+      logger.error("Failed to closed driver {}", dataDriverContext.getId(), t);
     } finally {
       removeUsedFilesForQuery();
     }
@@ -163,7 +166,7 @@ public class Driver implements ExecFragmentInstance {
    * we should change all the blocked lock operation into tryLock
    */
   private void initialize() throws QueryProcessException {
-    List<SourceOperator> sourceOperators = driverContext.getSourceOperators();
+    List<SourceOperator> sourceOperators = dataDriverContext.getSourceOperators();
     if (sourceOperators != null && !sourceOperators.isEmpty()) {
       QueryDataSource dataSource = initQueryDataSourceCache();
       sourceOperators.forEach(
@@ -186,11 +189,11 @@ public class Driver implements ExecFragmentInstance {
    * QueryDataSource needed for this query
    */
   public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
-    VirtualStorageGroupProcessor dataRegion = driverContext.getDataRegion();
+    VirtualStorageGroupProcessor dataRegion = dataDriverContext.getDataRegion();
     dataRegion.readLock();
     try {
       List<PartialPath> pathList =
-          driverContext.getPaths().stream()
+          dataDriverContext.getPaths().stream()
               .map(IDTable::translateQueryPath)
               .collect(Collectors.toList());
       // when all the selected series are under the same device, the QueryDataSource will be
@@ -202,8 +205,8 @@ public class Driver implements ExecFragmentInstance {
           dataRegion.query(
               pathList,
               selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
-              driverContext.getFragmentInstanceContext(),
-              driverContext.getTimeFilter());
+              dataDriverContext.getFragmentInstanceContext(),
+              dataDriverContext.getTimeFilter());
 
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
@@ -211,7 +214,7 @@ public class Driver implements ExecFragmentInstance {
 
       return dataSource;
     } finally {
-      driverContext.getDataRegion().readUnlock();
+      dataDriverContext.getDataRegion().readUnlock();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
similarity index 82%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
index ea11b12..52113e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
@@ -20,40 +20,30 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.List;
 
-public class DriverContext {
-  private final FragmentInstanceContext fragmentInstanceContext;
+public class DataDriverContext extends DriverContext {
   private final List<PartialPath> paths;
   private final Filter timeFilter;
   private final VirtualStorageGroupProcessor dataRegion;
   private final List<SourceOperator> sourceOperators;
 
-  public DriverContext(
+  public DataDriverContext(
       FragmentInstanceContext fragmentInstanceContext,
       List<PartialPath> paths,
       Filter timeFilter,
       VirtualStorageGroupProcessor dataRegion,
       List<SourceOperator> sourceOperators) {
-    this.fragmentInstanceContext = fragmentInstanceContext;
+    super(fragmentInstanceContext);
     this.paths = paths;
     this.timeFilter = timeFilter;
     this.dataRegion = dataRegion;
     this.sourceOperators = sourceOperators;
   }
 
-  public FragmentInstanceId getId() {
-    return fragmentInstanceContext.getId();
-  }
-
-  public FragmentInstanceContext getFragmentInstanceContext() {
-    return fragmentInstanceContext;
-  }
-
   public List<PartialPath> getPaths() {
     return paths;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index ea11b12..fddc3a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -18,32 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
-import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-import java.util.List;
 
 public class DriverContext {
+
   private final FragmentInstanceContext fragmentInstanceContext;
-  private final List<PartialPath> paths;
-  private final Filter timeFilter;
-  private final VirtualStorageGroupProcessor dataRegion;
-  private final List<SourceOperator> sourceOperators;
 
-  public DriverContext(
-      FragmentInstanceContext fragmentInstanceContext,
-      List<PartialPath> paths,
-      Filter timeFilter,
-      VirtualStorageGroupProcessor dataRegion,
-      List<SourceOperator> sourceOperators) {
+  public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
     this.fragmentInstanceContext = fragmentInstanceContext;
-    this.paths = paths;
-    this.timeFilter = timeFilter;
-    this.dataRegion = dataRegion;
-    this.sourceOperators = sourceOperators;
   }
 
   public FragmentInstanceId getId() {
@@ -53,20 +35,4 @@ public class DriverContext {
   public FragmentInstanceContext getFragmentInstanceContext() {
     return fragmentInstanceContext;
   }
-
-  public List<PartialPath> getPaths() {
-    return paths;
-  }
-
-  public Filter getTimeFilter() {
-    return timeFilter;
-  }
-
-  public VirtualStorageGroupProcessor getDataRegion() {
-    return dataRegion;
-  }
-
-  public List<SourceOperator> getSourceOperators() {
-    return sourceOperators;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 2a46ae8..1791e3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -32,7 +32,7 @@ public class FragmentInstanceExecution {
   private final FragmentInstanceId instanceId;
   private final FragmentInstanceContext context;
 
-  private final Driver driver;
+  private final ExecFragmentInstance driver;
 
   private FragmentInstanceState state;
 
@@ -42,7 +42,7 @@ public class FragmentInstanceExecution {
       IFragmentInstanceScheduler scheduler,
       FragmentInstanceId instanceId,
       FragmentInstanceContext context,
-      Driver driver) {
+      ExecFragmentInstance driver) {
     this.scheduler = scheduler;
     this.instanceId = instanceId;
     this.context = context;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index f8471ab..91a31c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
 import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
@@ -57,7 +58,7 @@ public class FragmentInstanceManager {
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
 
-              Driver driver =
+              DataDriver driver =
                   planner.plan(
                       instance.getFragment().getRoot(),
                       context,
@@ -69,6 +70,25 @@ public class FragmentInstanceManager {
     return execution.getInstanceInfo();
   }
 
+  public FragmentInstanceInfo execSchemaQueryFragmentInstance(
+      FragmentInstance instance, SchemaRegion schemaRegion) {
+    FragmentInstanceId instanceId = instance.getId();
+
+    FragmentInstanceExecution execution =
+        instanceExecution.computeIfAbsent(
+            instanceId,
+            id -> {
+              FragmentInstanceContext context =
+                  instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
+
+              SchemaDriver driver =
+                  planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
+              return new FragmentInstanceExecution(scheduler, instanceId, context, driver);
+            });
+
+    return execution.getInstanceInfo();
+  }
+
   /**
    * Gets the info for the specified fragment instance.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
new file mode 100644
index 0000000..e9df0f7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
+
+@NotThreadSafe
+public class SchemaDriver implements ExecFragmentInstance {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
+
+  private final Operator root;
+  private final ISinkHandle sinkHandle;
+  private final SchemaDriverContext driverContext;
+
+  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
+
+  public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
+    this.root = root;
+    this.sinkHandle = sinkHandle;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public boolean isFinished() {
+    try {
+      return root != null && root.isFinished();
+    } catch (Throwable t) {
+      logger.error(
+          "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
+      return true;
+    }
+  }
+
+  @Override
+  public ListenableFuture<Void> processFor(Duration duration) {
+    // if the driver is blocked we don't need to continue
+    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+    if (!blockedFuture.isDone()) {
+      return blockedFuture;
+    }
+
+    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+    long start = System.nanoTime();
+    try {
+      do {
+        ListenableFuture<Void> future = processInternal();
+        if (!future.isDone()) {
+          return updateDriverBlockedFuture(future);
+        }
+      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
+    } catch (Throwable t) {
+      logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+      close();
+    }
+    return NOT_BLOCKED;
+  }
+
+  private ListenableFuture<Void> processInternal() throws IOException {
+    ListenableFuture<Void> blocked = root.isBlocked();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    blocked = sinkHandle.isFull();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    if (root.hasNext()) {
+      TsBlock tsBlock = root.next();
+      if (tsBlock != null && !tsBlock.isEmpty()) {
+        sinkHandle.send(tsBlock);
+      }
+    }
+    return NOT_BLOCKED;
+  }
+
+  private ListenableFuture<Void> updateDriverBlockedFuture(
+      ListenableFuture<Void> sourceBlockedFuture) {
+    // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
+    // or any of the operators gets a memory revocation request
+    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+    driverBlockedFuture.set(newDriverBlockedFuture);
+    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
+
+    // TODO Although we don't have memory management for operator now, we should consider it for
+    // future
+    // it's possible that memory revoking is requested for some operator
+    // before we update driverBlockedFuture above and we don't want to miss that
+    // notification, so we check to see whether that's the case before returning.
+
+    return newDriverBlockedFuture;
+  }
+
+  @Override
+  public FragmentInstanceId getInfo() {
+    return driverContext.getId();
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
new file mode 100644
index 0000000..162bd0a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+
+public class SchemaDriverContext extends DriverContext {
+
+  private final SchemaRegion schemaRegion;
+
+  public SchemaDriverContext(
+      FragmentInstanceContext fragmentInstanceContext, SchemaRegion schemaRegion) {
+    super(fragmentInstanceContext);
+    this.schemaRegion = schemaRegion;
+  }
+
+  public SchemaRegion getSchemaRegion() {
+    return schemaRegion;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index da4f158..ece3255 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -20,11 +20,14 @@ package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
-import org.apache.iotdb.db.mpp.execution.Driver;
-import org.apache.iotdb.db.mpp.execution.DriverContext;
+import org.apache.iotdb.db.mpp.execution.DataDriver;
+import org.apache.iotdb.db.mpp.execution.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.SchemaDriver;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
@@ -68,7 +71,7 @@ public class LocalExecutionPlanner {
     return InstanceHolder.INSTANCE;
   }
 
-  public Driver plan(
+  public DataDriver plan(
       PlanNode plan,
       FragmentInstanceContext instanceContext,
       Filter timeFilter,
@@ -77,19 +80,33 @@ public class LocalExecutionPlanner {
 
     Operator root = plan.accept(new Visitor(), context);
 
-    DriverContext driverContext =
-        new DriverContext(
+    DataDriverContext dataDriverContext =
+        new DataDriverContext(
             instanceContext,
             context.getPaths(),
             timeFilter,
             dataRegion,
             context.getSourceOperators());
-    instanceContext.setDriverContext(driverContext);
-    return new Driver(root, context.getSinkHandle(), driverContext);
+    instanceContext.setDriverContext(dataDriverContext);
+    return new DataDriver(root, context.getSinkHandle(), dataDriverContext);
+  }
+
+  public SchemaDriver plan(
+      PlanNode plan, FragmentInstanceContext instanceContext, SchemaRegion schemaRegion) {
+
+    SchemaDriverContext schemaDriverContext =
+        new SchemaDriverContext(instanceContext, schemaRegion);
+    instanceContext.setDriverContext(schemaDriverContext);
+
+    LocalExecutionPlanContext context = new LocalExecutionPlanContext(instanceContext);
+
+    Operator root = plan.accept(new Visitor(), context);
+
+    return new SchemaDriver(root, context.getSinkHandle(), schemaDriverContext);
   }
 
   /** This Visitor is responsible for transferring PlanNode Tree to Operator Tree */
-  private class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
+  private static class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
 
     @Override
     public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) {
@@ -195,6 +212,7 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext context) {
+      // TODO(jackie tien) create SourceHandle here
       return super.visitExchange(node, context);
     }