You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/01 09:07:59 UTC
[iotdb] branch ty-mpp updated: add SchemaDriver
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty-mpp by this push:
new 552f893 add SchemaDriver
552f893 is described below
commit 552f89372b378026322a932d58e02b0718bb037f
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 1 17:04:24 2022 +0800
add SchemaDriver
---
.../mpp/execution/{Driver.java => DataDriver.java} | 35 +++---
.../{DriverContext.java => DataDriverContext.java} | 16 +--
.../iotdb/db/mpp/execution/DriverContext.java | 38 +-----
.../mpp/execution/FragmentInstanceExecution.java | 4 +-
.../db/mpp/execution/FragmentInstanceManager.java | 22 +++-
.../iotdb/db/mpp/execution/SchemaDriver.java | 136 +++++++++++++++++++++
.../db/mpp/execution/SchemaDriverContext.java | 36 ++++++
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 34 ++++--
8 files changed, 245 insertions(+), 76 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 91623a4..b1e06ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -51,13 +51,13 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
@NotThreadSafe
-public class Driver implements ExecFragmentInstance {
+public class DataDriver implements ExecFragmentInstance {
- private static final Logger logger = LoggerFactory.getLogger(Driver.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
private final Operator root;
private final ISinkHandle sinkHandle;
- private final DriverContext driverContext;
+ private final DataDriverContext dataDriverContext;
private boolean init;
private boolean closed;
@@ -69,10 +69,10 @@ public class Driver implements ExecFragmentInstance {
private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
- public Driver(Operator root, ISinkHandle sinkHandle, DriverContext driverContext) {
+ public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext dataDriverContext) {
this.root = root;
this.sinkHandle = sinkHandle;
- this.driverContext = driverContext;
+ this.dataDriverContext = dataDriverContext;
}
@Override
@@ -83,7 +83,8 @@ public class Driver implements ExecFragmentInstance {
try {
return root != null && root.isFinished();
} catch (Throwable t) {
- logger.error("Failed to query whether the driver {} is finished", driverContext.getId(), t);
+ logger.error(
+ "Failed to query whether the data driver {} is finished", dataDriverContext.getId(), t);
close();
return true;
}
@@ -100,7 +101,9 @@ public class Driver implements ExecFragmentInstance {
initialize();
} catch (Throwable t) {
logger.error(
- "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
+ "Failed to do the initialization for fragment instance {} ",
+ dataDriverContext.getId(),
+ t);
close();
return NOT_BLOCKED;
}
@@ -123,7 +126,7 @@ public class Driver implements ExecFragmentInstance {
}
} while (System.nanoTime() - start < maxRuntime && !root.isFinished());
} catch (Throwable t) {
- logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+ logger.error("Failed to execute fragment instance {}", dataDriverContext.getId(), t);
close();
}
return NOT_BLOCKED;
@@ -131,7 +134,7 @@ public class Driver implements ExecFragmentInstance {
@Override
public FragmentInstanceId getInfo() {
- return driverContext.getId();
+ return dataDriverContext.getId();
}
@Override
@@ -152,7 +155,7 @@ public class Driver implements ExecFragmentInstance {
sinkHandle.close();
}
} catch (Throwable t) {
- logger.error("Failed to closed driver {}", driverContext.getId(), t);
+ logger.error("Failed to closed driver {}", dataDriverContext.getId(), t);
} finally {
removeUsedFilesForQuery();
}
@@ -163,7 +166,7 @@ public class Driver implements ExecFragmentInstance {
* we should change all the blocked lock operation into tryLock
*/
private void initialize() throws QueryProcessException {
- List<SourceOperator> sourceOperators = driverContext.getSourceOperators();
+ List<SourceOperator> sourceOperators = dataDriverContext.getSourceOperators();
if (sourceOperators != null && !sourceOperators.isEmpty()) {
QueryDataSource dataSource = initQueryDataSourceCache();
sourceOperators.forEach(
@@ -186,11 +189,11 @@ public class Driver implements ExecFragmentInstance {
* QueryDataSource needed for this query
*/
public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
- VirtualStorageGroupProcessor dataRegion = driverContext.getDataRegion();
+ VirtualStorageGroupProcessor dataRegion = dataDriverContext.getDataRegion();
dataRegion.readLock();
try {
List<PartialPath> pathList =
- driverContext.getPaths().stream()
+ dataDriverContext.getPaths().stream()
.map(IDTable::translateQueryPath)
.collect(Collectors.toList());
// when all the selected series are under the same device, the QueryDataSource will be
@@ -202,8 +205,8 @@ public class Driver implements ExecFragmentInstance {
dataRegion.query(
pathList,
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
- driverContext.getFragmentInstanceContext(),
- driverContext.getTimeFilter());
+ dataDriverContext.getFragmentInstanceContext(),
+ dataDriverContext.getTimeFilter());
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
@@ -211,7 +214,7 @@ public class Driver implements ExecFragmentInstance {
return dataSource;
} finally {
- driverContext.getDataRegion().readUnlock();
+ dataDriverContext.getDataRegion().readUnlock();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
similarity index 82%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
index ea11b12..52113e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
@@ -20,40 +20,30 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.List;
-public class DriverContext {
- private final FragmentInstanceContext fragmentInstanceContext;
+public class DataDriverContext extends DriverContext {
private final List<PartialPath> paths;
private final Filter timeFilter;
private final VirtualStorageGroupProcessor dataRegion;
private final List<SourceOperator> sourceOperators;
- public DriverContext(
+ public DataDriverContext(
FragmentInstanceContext fragmentInstanceContext,
List<PartialPath> paths,
Filter timeFilter,
VirtualStorageGroupProcessor dataRegion,
List<SourceOperator> sourceOperators) {
- this.fragmentInstanceContext = fragmentInstanceContext;
+ super(fragmentInstanceContext);
this.paths = paths;
this.timeFilter = timeFilter;
this.dataRegion = dataRegion;
this.sourceOperators = sourceOperators;
}
- public FragmentInstanceId getId() {
- return fragmentInstanceContext.getId();
- }
-
- public FragmentInstanceContext getFragmentInstanceContext() {
- return fragmentInstanceContext;
- }
-
public List<PartialPath> getPaths() {
return paths;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index ea11b12..fddc3a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -18,32 +18,14 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
-import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-import java.util.List;
public class DriverContext {
+
private final FragmentInstanceContext fragmentInstanceContext;
- private final List<PartialPath> paths;
- private final Filter timeFilter;
- private final VirtualStorageGroupProcessor dataRegion;
- private final List<SourceOperator> sourceOperators;
- public DriverContext(
- FragmentInstanceContext fragmentInstanceContext,
- List<PartialPath> paths,
- Filter timeFilter,
- VirtualStorageGroupProcessor dataRegion,
- List<SourceOperator> sourceOperators) {
+ public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
this.fragmentInstanceContext = fragmentInstanceContext;
- this.paths = paths;
- this.timeFilter = timeFilter;
- this.dataRegion = dataRegion;
- this.sourceOperators = sourceOperators;
}
public FragmentInstanceId getId() {
@@ -53,20 +35,4 @@ public class DriverContext {
public FragmentInstanceContext getFragmentInstanceContext() {
return fragmentInstanceContext;
}
-
- public List<PartialPath> getPaths() {
- return paths;
- }
-
- public Filter getTimeFilter() {
- return timeFilter;
- }
-
- public VirtualStorageGroupProcessor getDataRegion() {
- return dataRegion;
- }
-
- public List<SourceOperator> getSourceOperators() {
- return sourceOperators;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 2a46ae8..1791e3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -32,7 +32,7 @@ public class FragmentInstanceExecution {
private final FragmentInstanceId instanceId;
private final FragmentInstanceContext context;
- private final Driver driver;
+ private final ExecFragmentInstance driver;
private FragmentInstanceState state;
@@ -42,7 +42,7 @@ public class FragmentInstanceExecution {
IFragmentInstanceScheduler scheduler,
FragmentInstanceId instanceId,
FragmentInstanceContext context,
- Driver driver) {
+ ExecFragmentInstance driver) {
this.scheduler = scheduler;
this.instanceId = instanceId;
this.context = context;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index f8471ab..91a31c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
@@ -57,7 +58,7 @@ public class FragmentInstanceManager {
FragmentInstanceContext context =
instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
- Driver driver =
+ DataDriver driver =
planner.plan(
instance.getFragment().getRoot(),
context,
@@ -69,6 +70,25 @@ public class FragmentInstanceManager {
return execution.getInstanceInfo();
}
+ public FragmentInstanceInfo execSchemaQueryFragmentInstance(
+ FragmentInstance instance, SchemaRegion schemaRegion) {
+ FragmentInstanceId instanceId = instance.getId();
+
+ FragmentInstanceExecution execution =
+ instanceExecution.computeIfAbsent(
+ instanceId,
+ id -> {
+ FragmentInstanceContext context =
+ instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
+
+ SchemaDriver driver =
+ planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
+ return new FragmentInstanceExecution(scheduler, instanceId, context, driver);
+ });
+
+ return execution.getInstanceInfo();
+ }
+
/**
* Gets the info for the specified fragment instance.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
new file mode 100644
index 0000000..e9df0f7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
+
+@NotThreadSafe
+public class SchemaDriver implements ExecFragmentInstance {
+
+ private static final Logger logger = LoggerFactory.getLogger(DataDriver.class);
+
+ private final Operator root;
+ private final ISinkHandle sinkHandle;
+ private final SchemaDriverContext driverContext;
+
+ private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
+
+ public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
+ this.root = root;
+ this.sinkHandle = sinkHandle;
+ this.driverContext = driverContext;
+ }
+
+ @Override
+ public boolean isFinished() {
+ try {
+ return root != null && root.isFinished();
+ } catch (Throwable t) {
+ logger.error(
+ "Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
+ return true;
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> processFor(Duration duration) {
+ // if the driver is blocked we don't need to continue
+ SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+ if (!blockedFuture.isDone()) {
+ return blockedFuture;
+ }
+
+ long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+ long start = System.nanoTime();
+ try {
+ do {
+ ListenableFuture<Void> future = processInternal();
+ if (!future.isDone()) {
+ return updateDriverBlockedFuture(future);
+ }
+ } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
+ } catch (Throwable t) {
+ logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+ close();
+ }
+ return NOT_BLOCKED;
+ }
+
+ private ListenableFuture<Void> processInternal() throws IOException {
+ ListenableFuture<Void> blocked = root.isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ blocked = sinkHandle.isFull();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ if (root.hasNext()) {
+ TsBlock tsBlock = root.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ sinkHandle.send(tsBlock);
+ }
+ }
+ return NOT_BLOCKED;
+ }
+
+ private ListenableFuture<Void> updateDriverBlockedFuture(
+ ListenableFuture<Void> sourceBlockedFuture) {
+ // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed
+ // or any of the operators gets a memory revocation request
+ SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+ driverBlockedFuture.set(newDriverBlockedFuture);
+ sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor());
+
+ // TODO Although we don't have memory management for operator now, we should consider it for
+ // future
+ // it's possible that memory revoking is requested for some operator
+ // before we update driverBlockedFuture above and we don't want to miss that
+ // notification, so we check to see whether that's the case before returning.
+
+ return newDriverBlockedFuture;
+ }
+
+ @Override
+ public FragmentInstanceId getInfo() {
+ return driverContext.getId();
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
new file mode 100644
index 0000000..162bd0a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+
+public class SchemaDriverContext extends DriverContext {
+
+ private final SchemaRegion schemaRegion;
+
+ public SchemaDriverContext(
+ FragmentInstanceContext fragmentInstanceContext, SchemaRegion schemaRegion) {
+ super(fragmentInstanceContext);
+ this.schemaRegion = schemaRegion;
+ }
+
+ public SchemaRegion getSchemaRegion() {
+ return schemaRegion;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index da4f158..ece3255 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -20,11 +20,14 @@ package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
-import org.apache.iotdb.db.mpp.execution.Driver;
-import org.apache.iotdb.db.mpp.execution.DriverContext;
+import org.apache.iotdb.db.mpp.execution.DataDriver;
+import org.apache.iotdb.db.mpp.execution.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.SchemaDriver;
+import org.apache.iotdb.db.mpp.execution.SchemaDriverContext;
import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
@@ -68,7 +71,7 @@ public class LocalExecutionPlanner {
return InstanceHolder.INSTANCE;
}
- public Driver plan(
+ public DataDriver plan(
PlanNode plan,
FragmentInstanceContext instanceContext,
Filter timeFilter,
@@ -77,19 +80,33 @@ public class LocalExecutionPlanner {
Operator root = plan.accept(new Visitor(), context);
- DriverContext driverContext =
- new DriverContext(
+ DataDriverContext dataDriverContext =
+ new DataDriverContext(
instanceContext,
context.getPaths(),
timeFilter,
dataRegion,
context.getSourceOperators());
- instanceContext.setDriverContext(driverContext);
- return new Driver(root, context.getSinkHandle(), driverContext);
+ instanceContext.setDriverContext(dataDriverContext);
+ return new DataDriver(root, context.getSinkHandle(), dataDriverContext);
+ }
+
+ public SchemaDriver plan(
+ PlanNode plan, FragmentInstanceContext instanceContext, SchemaRegion schemaRegion) {
+
+ SchemaDriverContext schemaDriverContext =
+ new SchemaDriverContext(instanceContext, schemaRegion);
+ instanceContext.setDriverContext(schemaDriverContext);
+
+ LocalExecutionPlanContext context = new LocalExecutionPlanContext(instanceContext);
+
+ Operator root = plan.accept(new Visitor(), context);
+
+ return new SchemaDriver(root, context.getSinkHandle(), schemaDriverContext);
}
/** This Visitor is responsible for transferring PlanNode Tree to Operator Tree */
- private class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
+ private static class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
@Override
public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) {
@@ -195,6 +212,7 @@ public class LocalExecutionPlanner {
@Override
public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext context) {
+ // TODO(jackie tien) create SourceHandle here
return super.visitExchange(node, context);
}