You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/14 05:53:41 UTC

[iotdb] 09/11: align class name

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6d78c0a621eebac0f21fd976e0656d58c5242fc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 19:40:12 2022 +0800

    align class name
---
 .../distribution/operator/ExecutableOperator.java  | 34 -------------
 .../distribution/operator/SeriesScanNode.java      | 57 ----------------------
 .../operator/internal/DeviceMergeOperator.java     | 43 ----------------
 .../operator/internal/FillOperator.java            | 25 ----------
 .../operator/internal/FilterInternalOperator.java  | 26 ----------
 .../operator/internal/GroupByLevelOperator.java    | 34 -------------
 .../operator/internal/InternalOperator.java        |  7 ---
 .../operator/internal/LimitOperator.java           | 24 ---------
 .../operator/internal/OffsetOperator.java          | 25 ----------
 .../operator/internal/SeriesAggregateOperator.java | 33 -------------
 .../operator/internal/SortOperator.java            | 23 ---------
 .../operator/internal/TimeJoinOperator.java        | 33 -------------
 .../operator/internal/WithoutOperator.java         | 25 ----------
 .../operator/source/CsvSourceOperator.java         | 41 ----------------
 .../distribution/{operator => plan}/PlanNode.java  |  8 +--
 .../process}/DeviceMergeNode.java                  | 18 ++-----
 .../{operator => plan/process}/FillNode.java       | 15 ++----
 .../process/FilterNode.java}                       | 15 ++----
 .../process}/GroupByLevelNode.java                 | 15 ++----
 .../{operator => plan/process}/LimitNode.java      | 15 ++----
 .../{operator => plan/process}/OffsetNode.java     | 15 ++----
 .../distribution/plan/process/ProcessNode.java     |  6 +++
 .../process}/SeriesAggregateNode.java              | 21 ++++----
 .../{operator => plan/process}/SortNode.java       | 16 ++----
 .../{operator => plan/process}/TimeJoinNode.java   | 15 ++----
 .../{operator => plan/process}/WithoutNode.java    | 15 ++----
 .../sink/CsvSinkNode.java}                         | 18 ++-----
 .../sink/SinkNode.java}                            |  9 ++--
 .../sink/ThriftSinkNode.java}                      | 17 +------
 .../source/CsvSourceNode.java}                     | 14 +-----
 .../source/SeriesScanNode.java}                    | 25 ++--------
 .../source/SourceNode.java}                        |  7 ++-
 32 files changed, 64 insertions(+), 630 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
deleted file mode 100644
index 00a7a6a..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
-
-public abstract class ExecutableOperator<T> extends TreeNode<ExecutableOperator<?>> {
-
-  // Resource control, runtime control...
-
-  // Judge whether current operator has more result
-  public abstract boolean hasNext();
-
-  // Get next result batch of this operator
-  // Return null if there is no more result to return
-  public abstract T getNextBatch();
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
deleted file mode 100644
index 1c8ac3d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-/**
- * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific series.
- * When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, it can leverage
- * the filter and other info to decrease the result set.
- * Besides, the SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
- * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated statistic or has overlap
- * with pre-aggregated statistic, the SeriesScanOperator will read the raw data and calculate the aggregation result for
- * specific time range.
- *
- * Children type: []
- */
-public class SeriesScanNode extends PlanNode<SeriesBatchData> {
-
-    // The path of the target series which will be scanned.
-    private Path seriesPath;
-
-    // The order to traverse the data.
-    // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
-    // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-    private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
-
-    // Filter data in current series.
-    private Filter filter;
-
-    // Limit for result set. The default value is -1, which means no limit
-    private int limit = -1;
-
-    // offset for result set. The default value is 0
-    private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchData getNextBatch() {
-        return null;
-    }
-
-    // This method will only be invoked by SeriesAggregateOperator
-    // It will return the statistics of the series in given time range
-    // When calculate the statistics, the operator should use the most optimized way to do that. In other words, using
-    // raw data is the final way to do that.
-    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
deleted file mode 100644
index d4bbced..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
- * output the result with specific order. The order could be 'order by device' or 'order by
- * timestamp'
- *
- * <p>The types of involved devices should be same. If the device contains n series, the
- * device-based view will contain n+2 columns, which are timestamp column, device name column and n
- * value columns of involved series.
- *
- * <p>Children type: [TimeJoinOperator]
- */
-public class DeviceMergeOperator extends InternalOperator<Tablet> {
-  // The result output order that this operator
-  private TraversalOrder mergeOrder;
-
-  // Owned devices
-  private List<String> ownedDeviceNameList;
-
-  // The map from deviceName to corresponding query result operator responsible for that device.
-  private Map<String, TimeJoinOperator> upstreamMap;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will
-  // contain n+1 columns where
-  // the additional column is `deviceName`
-  // And, the `alignedByDevice` in the TabletMetadata will be `true`
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
deleted file mode 100644
index 595a27e..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * FillOperator is used to fill the empty field in one row.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class FillOperator extends InternalOperator<Tablet> {
-
-  // The policy to discard the result from upstream operator
-  private FillPolicy fillPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
deleted file mode 100644
index f41d768..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-
-/**
- * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node
- * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class FilterInternalOperator extends InternalOperator<Tablet> {
-
-  // The filter
-  private FilterOperator rowFilter;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
deleted file mode 100644
index 9cab1c0..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * This operator is responsible for the final aggregation merge operation. It will arrange the data
- * by time range firstly. And inside each time range, the data from same measurement and different
- * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is
- * empty, the data from `same measurement and different devices` won't be rolled up. If the
- * groupByTimeParameter is null, the data won't be split by time range.
- *
- * <p>Children type: [SeriesAggregateOperator]
- */
-public class GroupByLevelOperator extends InternalOperator<Tablet> {
-
-  // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
-  private LevelBucketInfo bucketInfo;
-
-  // The parameter of `group by time`
-  // The GroupByLevelOperator also need GroupByTimeParameter
-  private GroupByTimeParameter groupByTimeParameter;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
deleted file mode 100644
index a5b8b5b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
-
-// 从 buffer 拉数据
-// 推送到下游的逻辑
-public abstract class InternalOperator<T> extends ExecutableOperator<T> {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
deleted file mode 100644
index 1330eee..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * LimitOperator is used to select top n result. It uses the default order of upstream operators
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class LimitOperator extends InternalOperator<Tablet> {
-
-  // The limit count
-  private int limit;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
deleted file mode 100644
index aa82d8c..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of
- * upstream operators
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class OffsetOperator extends InternalOperator<Tablet> {
-
-  // The limit count
-  private int offset;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
deleted file mode 100644
index 889c50b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This
- * operator will split data in one series into many groups by time range and do the aggregation
- * calculation for each group. If there is no split parameter, it will return one result which is
- * the aggregation result of all data in current series.
- *
- * <p>Children type: [SeriesScanOperator]
- */
-public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> {
-
-  // The parameter of `group by time`
-  // Its value will be null if there is no `group by time` clause,
-  private GroupByTimeParameter groupByTimeParameter;
-
-  // TODO: need consider how to represent the aggregation function and corresponding implementation
-  // We use a String to indicate the parameter temporarily
-  private String aggregationFunc;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchAggInfo getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
deleted file mode 100644
index d2aee44..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-/**
- * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our
- * optimized logical query plan, the sortOperator should not appear.
- */
-public class SortOperator extends InternalOperator<Tablet> {
-
-  private TraversalOrder sortOrder;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
deleted file mode 100644
index bd7cf37..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer
- * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp
- *
- * <p>Children type: [SeriesScanOperator]
- */
-public class TimeJoinOperator extends InternalOperator<Tablet> {
-
-  // This parameter indicates the order when executing multiway merge sort.
-  private TraversalOrder mergeOrder;
-
-  // The policy to decide whether a row should be discarded
-  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
-  // a row contains
-  // null or not in this operator the situation won't be changed by the downstream operators.
-  private WithoutPolicy withoutPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
deleted file mode 100644
index f678a2c..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * WithoutOperator is used to discard specific result from upstream operators.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class WithoutOperator extends InternalOperator<Tablet> {
-
-  // The policy to discard the result from upstream operator
-  private WithoutPolicy discardPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
deleted file mode 100644
index ff82128..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.query.distribution.operator.source;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-
-public class CsvSourceOperator extends SourceOperator<SeriesBatchData> {
-
-  @Override
-  public void close() throws Exception {}
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
similarity index 53%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
index ae30645..527b697 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan;
 
 import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
 
@@ -9,10 +9,4 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
  */
 public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> {
 
-    // Judge whether current operator has more result
-    public abstract boolean hasNext();
-
-    // Get next result batch of this operator
-    // Return null if there is no more result to return
-    public abstract T getNextBatch();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
similarity index 65%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
index ff831d9..fbadebe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
@@ -1,7 +1,8 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 import java.util.List;
 import java.util.Map;
@@ -15,7 +16,7 @@ import java.util.Map;
  *
  * Children type: [TimeJoinOperator]
  */
-public class DeviceMergeNode extends PlanNode<TsBlock> {
+public class DeviceMergeNode extends ProcessNode<TsBlock> {
     // The result output order that this operator
     private TraversalOrder mergeOrder;
 
@@ -24,17 +25,4 @@ public class DeviceMergeNode extends PlanNode<TsBlock> {
 
     // The map from deviceName to corresponding query result operator responsible for that device.
     private Map<String, TimeJoinNode> upstreamMap;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will contain n+1 columns where
-    // the additional column is `deviceName`
-    // And, the `alignedByDevice` in the TabletMetadata will be `true`
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
similarity index 58%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
index a8f0076..8b08640 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
@@ -1,25 +1,16 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * FillOperator is used to fill the empty field in one row.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FillNode extends PlanNode<TsBlock> {
+public class FillNode extends ProcessNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private FillPolicy fillPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
similarity index 62%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
index 0bf0ad4..a579d4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
@@ -1,6 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 /**
@@ -9,18 +10,8 @@ import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FilterExecNode extends PlanNode<TsBlock> {
+public class FilterNode extends ProcessNode<TsBlock> {
 
     // The filter
     private FilterOperator rowFilter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
similarity index 79%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
index 38448d5..2007900 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
@@ -1,8 +1,9 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * This operator is responsible for the final aggregation merge operation.
@@ -13,7 +14,7 @@ import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
  *
  * Children type: [SeriesAggregateOperator]
  */
-public class GroupByLevelNode extends PlanNode<TsBlock> {
+public class GroupByLevelNode extends ProcessNode<TsBlock> {
 
     // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
     private LevelBucketInfo bucketInfo;
@@ -21,14 +22,4 @@ public class GroupByLevelNode extends PlanNode<TsBlock> {
     // The parameter of `group by time`
     // The GroupByLevelOperator also need GroupByTimeParameter
     private GroupByTimeParameter groupByTimeParameter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
similarity index 52%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
index 875f8c4..e675086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
@@ -1,24 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * LimitOperator is used to select top n result. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class LimitNode extends PlanNode<TsBlock> {
+public class LimitNode extends ProcessNode<TsBlock> {
 
     // The limit count
     private int limit;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
similarity index 54%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
index 6de44c5..78ac9eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
@@ -1,24 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class OffsetNode extends PlanNode<TsBlock> {
+public class OffsetNode extends ProcessNode<TsBlock> {
 
     // The limit count
     private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
new file mode 100644
index 0000000..e7bffae
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
@@ -0,0 +1,6 @@
+package org.apache.iotdb.cluster.query.distribution.plan.process;
+
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+
+public class ProcessNode<T> extends PlanNode<T> {
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
similarity index 55%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
index 62fb223..46ef991 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
@@ -1,7 +1,11 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 /**
  * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
@@ -11,7 +15,7 @@ import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
  *
  * Children type: [SeriesScanOperator]
  */
-public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
+public class SeriesAggregateNode extends ProcessNode<TsBlock> {
 
     // The parameter of `group by time`
     // Its value will be null if there is no `group by time` clause,
@@ -21,13 +25,12 @@ public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
     // We use a String to indicate the parameter temporarily
     private String aggregationFunc;
 
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchAggInfo getNextBatch() {
+    // This method will only be invoked by SeriesAggregateOperator
+    // It will return the statistics of the series in given time range
+    // When calculate the statistics, the operator should use the most optimized way to do that. In
+    // other words, using
+    // raw data is the final way to do that.
+    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
similarity index 57%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
index 8bcc520..a718247 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
@@ -1,23 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * In general, the parameter in sortOperator should be pushed down to the upstream operators.
  * In our optimized logical query plan, the sortOperator should not appear.
  */
-public class SortNode extends PlanNode<TsBlock> {
+public class SortNode extends ProcessNode<TsBlock> {
 
     private TraversalOrder sortOrder;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
+    
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
similarity index 76%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
index b293a13..08a7617 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
@@ -1,8 +1,9 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * TimeJoinOperator is responsible for join two or more series.
@@ -11,7 +12,7 @@ import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
  *
  * Children type: [SeriesScanOperator]
  */
-public class TimeJoinNode extends PlanNode<TsBlock> {
+public class TimeJoinNode extends ProcessNode<TsBlock> {
 
     // This parameter indicates the order when executing multiway merge sort.
     private TraversalOrder mergeOrder;
@@ -20,14 +21,4 @@ public class TimeJoinNode extends PlanNode<TsBlock> {
     // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
     // null or not in this operator the situation won't be changed by the downstream operators.
     private WithoutPolicy withoutPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
similarity index 60%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
index c58a367..b3f1d9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
@@ -1,25 +1,16 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * WithoutOperator is used to discard specific result from upstream operators.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class WithoutNode extends PlanNode<TsBlock> {
+public class WithoutNode extends ProcessNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private WithoutPolicy discardPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
similarity index 70%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
index c9de077..3e9dc4c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
@@ -17,25 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class ThriftSinkOperator extends SinkOperator<SeriesBatchData> {
-
+public class CsvSinkNode extends SinkNode<SeriesBatchData> {
   @Override
-  public void close() throws Exception {}
+  public void close() throws Exception {
 
-  @Override
-  public boolean hasNext() {
-    return false;
   }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
similarity index 70%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
index 304cafe..a94a0ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.source;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
-// 区分不同的数据源,可拓展。
-public abstract class SourceOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+// 构建与客户端的联系。
+public abstract class SinkNode<T> extends PlanNode<T> implements AutoCloseable {
 
-  public abstract void open() throws Exception;
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
similarity index 74%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
index d8e6588..beed5a0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
@@ -17,25 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class RestSinkOperator extends SinkOperator<SeriesBatchData> {
+public class ThriftSinkNode extends SinkNode<SeriesBatchData> {
 
   @Override
   public void close() throws Exception {}
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
similarity index 78%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
index 17bba65..85f402a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
@@ -17,25 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class CsvSinkOperator extends SinkOperator<SeriesBatchData> {
+public class CsvSourceNode extends SourceNode<SeriesBatchData> {
 
   @Override
   public void close() throws Exception {}
 
   @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
   public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
similarity index 69%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
index fd67f9e..7ff8978 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator.source;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  *
  * <p>Children type: []
  */
-public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
+public class SeriesScanNode extends SourceNode<TsBlock> {
 
   // The path of the target series which will be scanned.
   private Path seriesPath;
@@ -38,25 +38,6 @@ public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
   private int offset;
 
   @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  // This method will only be invoked by SeriesAggregateOperator
-  // It will return the statistics of the series in given time range
-  // When calculate the statistics, the operator should use the most optimized way to do that. In
-  // other words, using
-  // raw data is the final way to do that.
-  public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-    return null;
-  }
-
-  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
similarity index 75%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
index 29c906d..ed0e39a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
-// 构建与客户端的联系。
-public abstract class SinkOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+public abstract class SourceNode<T> extends PlanNode<T> implements AutoCloseable{
 
   public abstract void open() throws Exception;
 }