You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/20 06:33:19 UTC
[iotdb] branch master updated: [IOTDB-2767] Add Operators and PlanVisitor definitions for mpp (#5286)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c83ccfa [IOTDB-2767] Add Operators and PlanVisitor definitions for mpp (#5286)
c83ccfa is described below
commit c83ccfa2b33c9d51572796ce23520e87911c4811
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Sun Mar 20 14:32:35 2022 +0800
[IOTDB-2767] Add Operators and PlanVisitor definitions for mpp (#5286)
---
.../{WithoutPolicy.java => FilterNullPolicy.java} | 2 +-
.../mpp/common/{TreeNode.java => FragmentId.java} | 40 +++++--
.../{QueryId.java => FragmentInstanceId.java} | 11 +-
.../org/apache/iotdb/db/mpp/common/QueryId.java | 90 +++++++++++++-
.../org/apache/iotdb/db/mpp/common/TsBlock.java | 25 ++++
.../TreeNode.java => execution/FragmentInfo.java} | 33 +++---
.../db/mpp/execution/FragmentInstanceContext.java | 67 +++++++++++
.../db/mpp/execution/FragmentInstanceState.java | 68 +++++++++++
.../iotdb/db/mpp/execution/FragmentState.java | 71 +++++++++++
.../iotdb/db/mpp/execution/QueryExecution.java | 10 +-
.../ClusterScheduler.java} | 34 +++++-
.../scheduler/IScheduler.java} | 30 ++---
.../execution/scheduler/StandaloneScheduler.java | 56 +++++++++
.../iotdb/db/mpp/operator/OperatorContext.java | 6 +-
.../process/AggregateOperator.java} | 35 +++++-
.../process/DeviceMergeOperator.java} | 34 +++++-
.../process/FillOperator.java} | 34 +++++-
.../process/FilterNullOperator.java} | 35 +++++-
.../process/GroupByLevelOperator.java} | 35 +++++-
.../db/mpp/operator/process/LimitOperator.java | 75 ++++++++++++
.../process/OffsetOperator.java} | 35 +++++-
.../process/ProcessOperator.java} | 15 +--
.../process/SortOperator.java} | 35 +++++-
.../process/TimeJoinOperator.java} | 35 +++++-
.../sink/FragmentSinkOperator.java} | 51 +++++---
.../sink/SinkOperator.java} | 29 +++--
.../source/SeriesAggregateScanOperator.java} | 45 ++++---
.../source/SeriesScanOperator.java} | 46 ++++---
.../source/SourceOperator.java} | 14 +--
.../iotdb/db/mpp/plan/FragmentInstanceId.java | 30 -----
.../db/mpp/plan/node/process/FilterNullNode.java | 38 ------
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 132 +++++++++++++++++++++
.../planner/optimization}/PlanOptimizer.java | 7 +-
.../planner}/plan/DistributedQueryPlan.java | 7 +-
.../planner}/plan/DistributionPlanner.java | 2 +-
.../{ => sql/planner}/plan/FragmentInstance.java | 4 +-
.../mpp/{ => sql/planner}/plan/LogicalPlanner.java | 4 +-
.../{ => sql/planner}/plan/LogicalQueryPlan.java | 7 +-
.../mpp/{ => sql/planner}/plan/PlanFragment.java | 7 +-
.../mpp/{ => sql/planner}/plan/PlanFragmentId.java | 2 +-
.../mpp/{ => sql/planner}/plan/node/PlanNode.java | 31 +++--
.../{ => sql/planner}/plan/node/PlanNodeId.java | 2 +-
.../planner}/plan/node/PlanNodeIdAllocator.java | 2 +-
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 76 ++++++++++++
.../planner/plan/node/process/AggregateNode.java} | 49 +++++---
.../plan/node/process/DeviceMergeNode.java | 38 ++++--
.../planner}/plan/node/process/FillNode.java | 27 ++++-
.../sql/planner/plan/node/process/FilterNode.java | 66 +++++++++++
.../planner/plan/node/process/FilterNullNode.java | 69 +++++++++++
.../plan/node/process/GroupByLevelNode.java | 48 +++++++-
.../planner}/plan/node/process/LimitNode.java | 41 +++++--
.../planner}/plan/node/process/OffsetNode.java | 39 ++++--
.../planner}/plan/node/process/ProcessNode.java | 10 +-
.../planner}/plan/node/process/SortNode.java | 35 +++++-
.../planner}/plan/node/process/TimeJoinNode.java | 55 ++++++---
.../planner}/plan/node/sink/CsvSinkNode.java | 17 ++-
.../planner}/plan/node/sink/FragmentSinkNode.java | 17 ++-
.../{ => sql/planner}/plan/node/sink/SinkNode.java | 9 +-
.../planner}/plan/node/sink/ThriftSinkNode.java | 17 ++-
.../planner}/plan/node/source/CsvSourceNode.java | 17 ++-
.../plan/node/source/SeriesAggregateScanNode.java} | 35 +++++-
.../planner}/plan/node/source/SeriesScanNode.java | 27 ++++-
.../planner}/plan/node/source/SourceNode.java | 9 +-
.../tree/Expression.java} | 7 +-
64 files changed, 1682 insertions(+), 397 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
similarity index 96%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
index 7a4107e..7979b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.common;
-public enum WithoutPolicy {
+public enum FilterNullPolicy {
CONTAINS_NULL,
ALL_NULL
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
index 4a11358..ef0df6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
@@ -20,22 +20,38 @@ package org.apache.iotdb.db.mpp.common;
import java.util.List;
-/**
- * @author A simple class to describe the tree style structure of query executable operators
- * @param <T>
- */
-public class TreeNode<T extends TreeNode<T>> {
- protected List<T> children;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class FragmentId {
+
+ private final QueryId queryId;
+ private final int id;
+
+ public static FragmentId valueOf(String stageId) {
+ List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
+ return valueOf(ids);
+ }
+
+ public static FragmentId valueOf(List<String> ids) {
+ checkArgument(ids.size() == 2, "Expected two ids but got: %s", ids);
+ return new FragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
+ }
+
+ public FragmentId(String queryId, int id) {
+ this(new QueryId(queryId), id);
+ }
- public T getChild(int i) {
- return hasChild(i) ? children.get(i) : null;
+ public FragmentId(QueryId queryId, int id) {
+ this.queryId = requireNonNull(queryId, "queryId is null");
+ this.id = id;
}
- public boolean hasChild(int i) {
- return children.size() > i;
+ public QueryId getQueryId() {
+ return queryId;
}
- public void addChild(T n) {
- children.add(n);
+ public int getId() {
+ return id;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
similarity index 86%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index dd8d436..1ab92ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -18,14 +18,11 @@
*/
package org.apache.iotdb.db.mpp.common;
-public class QueryId {
- private String Id;
+public class FragmentInstanceId {
- public String getId() {
- return Id;
- }
+ private final String fullId;
- public void setId(String id) {
- Id = id;
+ public FragmentInstanceId(String fullId) {
+ this.fullId = fullId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index dd8d436..88289b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -18,14 +18,96 @@
*/
package org.apache.iotdb.db.mpp.common;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
public class QueryId {
- private String Id;
+
+ private final String id;
+
+ public static QueryId valueOf(String queryId) {
+ // ID is verified in the constructor
+ return new QueryId(queryId);
+ }
+
+ public QueryId(String id) {
+ this.id = validateId(id);
+ }
public String getId() {
- return Id;
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ QueryId other = (QueryId) obj;
+ return Objects.equals(this.id, other.id);
+ }
+
+ public static List<String> parseDottedId(String id, int expectedParts, String name) {
+ requireNonNull(id, "id is null");
+ checkArgument(expectedParts > 0, "expectedParts must be at least 1");
+ requireNonNull(name, "name is null");
+
+ List<String> ids = Arrays.asList(id.split("\\."));
+ checkArgument(ids.size() == expectedParts, "Invalid %s %s", name, id);
+
+ for (String part : ids) {
+ checkArgument(!part.isEmpty(), "Invalid id %s", id);
+ checkArgument(isValidId(part), "Invalid id %s", id);
+ }
+ return ids;
+ }
+
+ private static void checkArgument(boolean condition, String message, Object... messageArgs) {
+ if (!condition) {
+ throw new IllegalArgumentException(format(message, messageArgs));
+ }
+ }
+
+ //
+ // Id helper methods
+ //
+
+ // Check if the string matches [_a-z0-9]+ , but without the overhead of regex
+ private static boolean isValidId(String id) {
+ if (id.length() == 0) {
+ return false;
+ }
+
+ for (int i = 0; i < id.length(); i++) {
+ char c = id.charAt(i);
+ if (!(c == '_' || c >= 'a' && c <= 'z' || c >= '0' && c <= '9')) {
+ return false;
+ }
+ }
+ return true;
}
- public void setId(String id) {
- Id = id;
+ public static String validateId(String id) {
+ requireNonNull(id, "id is null");
+ checkArgument(!id.isEmpty(), "id is empty");
+ checkArgument(isValidId(id), "Invalid id %s", id);
+ return id;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
index aa40205..7ea49ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.mpp.common;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import static java.lang.String.format;
+
/**
* Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
* and constructs them as a row based view The columns can be series, aggregation result for one
@@ -33,6 +35,8 @@ public class TsBlock {
// Describe the column info
private TsBlockMetadata metadata;
+ private int count;
+
public boolean hasNext() {
return false;
}
@@ -45,4 +49,25 @@ public class TsBlock {
public TsBlockMetadata getMetadata() {
return metadata;
}
+
+ public int getCount() {
+ return count;
+ }
+
+ /**
+ * TODO has not been implemented yet
+ *
+ * @param positionOffset start offset
+ * @param length slice length
+ * @return view of current TsBlock start from positionOffset to positionOffset + length
+ */
+ public TsBlock getRegion(int positionOffset, int length) {
+ if (positionOffset < 0 || length < 0 || positionOffset + length > count) {
+ throw new IndexOutOfBoundsException(
+ format(
+ "Invalid position %s and length %s in page with %s positions",
+ positionOffset, length, count));
+ }
+ return this;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
index 4a11358..7bd9300 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
@@ -16,26 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import java.util.List;
-/**
- * @author A simple class to describe the tree style structure of query executable operators
- * @param <T>
- */
-public class TreeNode<T extends TreeNode<T>> {
- protected List<T> children;
+public class FragmentInfo {
- public T getChild(int i) {
- return hasChild(i) ? children.get(i) : null;
- }
+ private final FragmentId stageId;
+ private final FragmentState state;
+ private final PlanFragment plan;
- public boolean hasChild(int i) {
- return children.size() > i;
- }
+ private final List<FragmentInfo> childrenFragments;
- public void addChild(T n) {
- children.add(n);
+ public FragmentInfo(
+ FragmentId stageId,
+ FragmentState state,
+ PlanFragment plan,
+ List<FragmentInfo> childrenFragments) {
+ this.stageId = stageId;
+ this.state = state;
+ this.plan = plan;
+ this.childrenFragments = childrenFragments;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
new file mode 100644
index 0000000..eb00953
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class FragmentInstanceContext {
+
+ private FragmentInstanceId id;
+
+ // TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
+ // with CopyOnWriteArrayList or some other thread safe data structure
+ private final List<OperatorContext> operatorContexts = new ArrayList<>();
+
+ private final long createNanos = System.nanoTime();
+
+ // private final GcMonitor gcMonitor;
+ // private final AtomicLong startNanos = new AtomicLong();
+ // private final AtomicLong startFullGcCount = new AtomicLong(-1);
+ // private final AtomicLong startFullGcTimeNanos = new AtomicLong(-1);
+ // private final AtomicLong endNanos = new AtomicLong();
+ // private final AtomicLong endFullGcCount = new AtomicLong(-1);
+ // private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
+
+ public FragmentInstanceContext(FragmentInstanceId id) {
+ this.id = id;
+ }
+
+ public OperatorContext addOperatorContext(
+ int operatorId, PlanNodeId planNodeId, String operatorType) {
+ checkArgument(operatorId >= 0, "operatorId is negative");
+
+ for (OperatorContext operatorContext : operatorContexts) {
+ checkArgument(
+ operatorId != operatorContext.getOperatorId(),
+ "A context already exists for operatorId %s",
+ operatorId);
+ }
+
+ OperatorContext operatorContext = new OperatorContext(operatorId, planNodeId, operatorType);
+ operatorContexts.add(operatorContext);
+ return operatorContext;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
new file mode 100644
index 0000000..a98210b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum FragmentInstanceState {
+ /**
+ * Instance is planned but has not been scheduled yet. An instance will be in the planned state
+ * until, the dependencies of the instance have begun producing output.
+ */
+ PLANNED(false),
+ /** Instance is running. */
+ RUNNING(false),
+ /**
+ * Instance has finished executing and output is left to be consumed. In this state, there will be
+ * no new drivers, the existing drivers have finished and the output buffer of the instance is
+ * at-least in a 'no-more-tsBlocks' state.
+ */
+ FLUSHING(false),
+ /** Instance has finished executing and all output has been consumed. */
+ FINISHED(true),
+ /** Instance was canceled by a user. */
+ CANCELED(true),
+ /** Instance was aborted due to a failure in the query. The failure was not in this instance. */
+ ABORTED(true),
+ /** Instance execution failed. */
+ FAILED(true);
+
+ public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
+ Stream.of(FragmentInstanceState.values())
+ .filter(FragmentInstanceState::isDone)
+ .collect(toImmutableSet());
+
+ /**
+ * If doneState is true, indicating that it won't transfer to another state anymore, i.e. a
+ * terminal state.
+ */
+ private final boolean doneState;
+
+ FragmentInstanceState(boolean doneState) {
+ this.doneState = doneState;
+ }
+
+ /** Is this a terminal state. */
+ public boolean isDone() {
+ return doneState;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentState.java
new file mode 100644
index 0000000..8b596c5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum FragmentState {
+
+ /**
+ * Fragment is planned but has not been scheduled yet. A Fragment will be in the planned state
+ * until, the dependencies of the Fragment have begun producing output.
+ */
+ PLANNED(false, false),
+ /** Fragment instances are being scheduled on nodes. */
+ SCHEDULING(false, false),
+ /** Fragment is running. */
+ RUNNING(false, false),
+ /**
+ * Fragment has finished executing existing tasks but more instances could be scheduled in the
+ * future.
+ */
+ PENDING(false, false),
+ /** Fragment has finished executing and all output has been consumed. */
+ FINISHED(true, false),
+ /** Fragment was aborted due to a failure in the query. The failure was not in this Fragment. */
+ ABORTED(true, true),
+ /** Fragment execution failed. */
+ FAILED(true, true);
+
+ public static final Set<FragmentState> TERMINAL_FRAGMENT_STATES =
+ Stream.of(FragmentState.values()).filter(FragmentState::isDone).collect(toImmutableSet());
+
+ private final boolean doneState;
+ private final boolean failureState;
+
+ FragmentState(boolean doneState, boolean failureState) {
+ checkArgument(!failureState || doneState, "%s is a non-done failure state", name());
+ this.doneState = doneState;
+ this.failureState = failureState;
+ }
+
+ /** Is this a terminal state. */
+ public boolean isDone() {
+ return doneState;
+ }
+
+ /** Is this a non-success terminal state. */
+ public boolean isFailure() {
+ return failureState;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 2b9d8cf..62a2c65 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -20,8 +20,10 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.Analysis;
import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.*;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
+import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
import java.nio.ByteBuffer;
import java.util.List;
@@ -34,7 +36,7 @@ import java.util.List;
*/
public class QueryExecution {
private QueryContext context;
- private QueryScheduler scheduler;
+ private IScheduler scheduler;
private QueryStateMachine stateMachine;
private List<PlanOptimizer> planOptimizers;
@@ -57,7 +59,7 @@ public class QueryExecution {
}
public void schedule() {
- this.scheduler = new QueryScheduler(this.stateMachine, this.fragmentInstances);
+ this.scheduler = new ClusterScheduler(this.stateMachine, this.fragmentInstances);
this.scheduler.start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index fc5df30..4af8b31 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -16,9 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution;
+package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.mpp.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
+import io.airlift.units.Duration;
import java.util.List;
@@ -29,20 +35,40 @@ import java.util.List;
* <p>Later, we can add more control logic for a QueryExecution such as retry, kill and so on by
* this scheduler.
*/
-public class QueryScheduler {
+public class ClusterScheduler implements IScheduler {
// The stateMachine of the QueryExecution owned by this QueryScheduler
private QueryStateMachine stateMachine;
// The fragment instances which should be sent to corresponding Nodes.
private List<FragmentInstance> instances;
- public QueryScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
+ public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
this.stateMachine = stateMachine;
this.instances = instances;
}
+ @Override
public void start() {}
+ @Override
+ public void abort() {}
+
+ @Override
+ public Duration getTotalCpuTime() {
+ return null;
+ }
+
+ @Override
+ public FragmentInfo getFragmentInfo() {
+ return null;
+ }
+
+ @Override
+ public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+
+ @Override
+ public void cancelFragment(FragmentId fragmentId) {}
+
// Send the instances to other nodes
private void sendFragmentInstances() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
index a4cb88c..16145fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
@@ -16,23 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInfo;
-/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode {
+import io.airlift.units.Duration;
- // The filter
- private FilterOperator rowFilter;
+public interface IScheduler {
- public FilterNode(PlanNodeId id) {
- super(id);
- }
+ void start();
- public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
- this(id);
- this.rowFilter = rowFilter;
- }
+ void abort();
+
+ Duration getTotalCpuTime();
+
+ FragmentInfo getFragmentInfo();
+
+ void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
+
+ void cancelFragment(FragmentId fragmentId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
new file mode 100644
index 0000000..10de6b5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.metadata.SchemaEngine;
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+
+import io.airlift.units.Duration;
+
+public class StandaloneScheduler implements IScheduler {
+
+ private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
+
+ private static final SchemaEngine SCHEMA_ENGINE = SchemaEngine.getInstance();
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void abort() {}
+
+ @Override
+ public Duration getTotalCpuTime() {
+ return null;
+ }
+
+ @Override
+ public FragmentInfo getFragmentInfo() {
+ return null;
+ }
+
+ @Override
+ public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+
+ @Override
+ public void cancelFragment(FragmentId fragmentId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
index c635f74..feedc13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
/**
* Contains information about {@link Operator} execution.
@@ -36,4 +36,8 @@ public class OperatorContext {
this.planNodeId = planNodeId;
this.operatorType = operatorType;
}
+
+ public int getOperatorId() {
+ return operatorId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index 63e07b4..6366b29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -16,14 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class AggregateOperator implements ProcessOperator {
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index 63e07b4..408296a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -16,14 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DeviceMergeOperator implements ProcessOperator {
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index 63e07b4..66dc3ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -16,14 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class FillOperator implements ProcessOperator {
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index 63e07b4..8d15250 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -16,14 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class FilterNullOperator implements ProcessOperator {
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 63e07b4..10e9daa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -16,14 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class GroupByLevelOperator implements ProcessOperator {
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
new file mode 100644
index 0000000..1ebca2c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.process;
+
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class LimitOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private long remainingLimit;
+ private final Operator child;
+
+ public LimitOperator(OperatorContext operatorContext, long limit, Operator child) {
+ this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+ checkArgument(limit >= 0, "limit must be at least zero");
+ this.remainingLimit = limit;
+ this.child = requireNonNull(child, "child operator is null");
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock block = child.next();
+ TsBlock res = block;
+ if (block.getCount() <= remainingLimit) {
+ remainingLimit -= block.getCount();
+ } else {
+ res = block.getRegion(0, (int) remainingLimit);
+ remainingLimit = 0;
+ }
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return child.hasNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index 63e07b4..25b4bc9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -16,14 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class OffsetOperator implements ProcessOperator {
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
similarity index 78%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
index dd8d436..aeb9535 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
@@ -16,16 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.process;
-public class QueryId {
- private String Id;
+import org.apache.iotdb.db.mpp.operator.Operator;
- public String getId() {
- return Id;
- }
-
- public void setId(String id) {
- Id = id;
- }
-}
+// TODO should think about what interfaces should this ProcessOperator have
+public interface ProcessOperator extends Operator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index 63e07b4..0199d52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -16,14 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SortOperator implements ProcessOperator {
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 63e07b4..11cee59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -16,14 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-public class ProcessNode extends PlanNode<TsBlock> {
- public ProcessNode(PlanNodeId id) {
- super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class TimeJoinOperator implements ProcessOperator {
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
index aa40205..aca892f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
@@ -16,33 +16,46 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.sink;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
+import com.google.common.util.concurrent.ListenableFuture;
- // Describe the column info
- private TsBlockMetadata metadata;
+public class FragmentSinkOperator implements SinkOperator {
- public boolean hasNext() {
- return false;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return SinkOperator.super.isBlocked();
}
- // Get next row in current tablet
- public RowRecord getNext() {
+ @Override
+ public TsBlock next() {
return null;
}
- public TsBlockMetadata getMetadata() {
- return metadata;
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ SinkOperator.super.close();
}
+
+ @Override
+ public void send(TsBlock tsBlock) {}
+
+ @Override
+ public void setNoMoreTsBlocks() {}
+
+ @Override
+ public void abort() {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
index 9954c74..c3f16ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
@@ -16,19 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.sink;
-import org.apache.iotdb.db.mpp.common.QueryContext;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.Operator;
-import java.util.List;
+public interface SinkOperator extends Operator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ /**
+ * Sends a tsBlock to an unpartitioned buffer. If no-more-tsBlocks has been set, the send tsBlock
+ * call is ignored. This can happen with limit queries.
+ */
+ void send(TsBlock tsBlock);
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ /**
+ * Notify SinkHandle that no more tsBlocks will be sent. Any future calls to send a tsBlock are
+ * ignored.
+ */
+ void setNoMoreTsBlocks();
+
+ /**
+ * Abort the sink handle, discarding all tsBlocks which may still in memory buffer, but blocking
+ * readers. It is expected that readers will be unblocked when the failed query is cleaned up.
+ */
+ void abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index aa40205..fcd4605 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -16,33 +16,42 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.source;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SeriesAggregateScanOperator implements SourceOperator {
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // Describe the column info
- private TsBlockMetadata metadata;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return SourceOperator.super.isBlocked();
+ }
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
public boolean hasNext() {
return false;
}
- // Get next row in current tablet
- public RowRecord getNext() {
- return null;
+ @Override
+ public void close() throws Exception {
+ SourceOperator.super.close();
}
- public TsBlockMetadata getMetadata() {
- return metadata;
+ @Override
+ public PlanNodeId getSourceId() {
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index aa40205..c0bc9ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -16,33 +16,43 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.source;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SeriesScanOperator implements SourceOperator {
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // Describe the column info
- private TsBlockMetadata metadata;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return SourceOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+ @Override
public boolean hasNext() {
return false;
}
- // Get next row in current tablet
- public RowRecord getNext() {
- return null;
+ @Override
+ public void close() throws Exception {
+ SourceOperator.super.close();
}
- public TsBlockMetadata getMetadata() {
- return metadata;
+ @Override
+ public PlanNodeId getSourceId() {
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
similarity index 77%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
index dd8d436..8454fd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
@@ -16,16 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.source;
-public class QueryId {
- private String Id;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
- public String getId() {
- return Id;
- }
+public interface SourceOperator extends Operator {
- public void setId(String id) {
- Id = id;
- }
+ PlanNodeId getSourceId();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
deleted file mode 100644
index 18181cd..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan;
-
-public class FragmentInstanceId {
- private String id;
-
- public FragmentInstanceId(String id) {
- this.id = id;
- }
-
- // A SinkOperator is needed here. So that we can know where the result of this instance can be
- // sent
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
deleted file mode 100644
index d7c463d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan.node.process;
-
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
-
-/** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode {
-
- // The policy to discard the result from upstream operator
- private WithoutPolicy discardPolicy;
-
- public FilterNullNode(PlanNodeId id) {
- super(id);
- }
-
- public FilterNullNode(PlanNodeId id, WithoutPolicy discardPolicy) {
- this(id);
- this.discardPolicy = discardPolicy;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
new file mode 100644
index 0000000..278a80c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/**
+ * used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
+ * Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
+ * run a fragment instance parallel and take full advantage of multi-cores
+ */
+public class LocalExecutionPlanner {
+
+ /** This Visitor is responsible for transferring PlanNode Tree to Operator Tree */
+ private class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
+
+ @Override
+ public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) {
+ throw new UnsupportedOperationException("should call the concrete visitXX() method");
+ }
+
+ @Override
+ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) {
+ return super.visitSeriesScan(node, context);
+ }
+
+ @Override
+ public Operator visitSeriesAggregate(
+ SeriesAggregateScanNode node, LocalExecutionPlanContext context) {
+ return super.visitSeriesAggregate(node, context);
+ }
+
+ @Override
+ public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
+ return super.visitDeviceMerge(node, context);
+ }
+
+ @Override
+ public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
+ return super.visitFill(node, context);
+ }
+
+ @Override
+ public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
+ PlanNode child = node.getChild();
+
+ FilterOperator filterExpression = node.getPredicate();
+ List<String> outputSymbols = node.getOutputColumnNames();
+ return super.visitFilter(node, context);
+ }
+
+ @Override
+ public Operator visitFilterNull(FilterNullNode node, LocalExecutionPlanContext context) {
+ return super.visitFilterNull(node, context);
+ }
+
+ @Override
+ public Operator visitGroupByLevel(GroupByLevelNode node, LocalExecutionPlanContext context) {
+ return super.visitGroupByLevel(node, context);
+ }
+
+ @Override
+ public Operator visitLimit(LimitNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ return new LimitOperator(
+ context.taskContext.addOperatorContext(
+ context.getNextOperatorId(), node.getId(), LimitOperator.class.getSimpleName()),
+ node.getLimit(),
+ child);
+ }
+
+ @Override
+ public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext context) {
+ return super.visitOffset(node, context);
+ }
+
+ @Override
+ public Operator visitRowBasedSeriesAggregate(
+ AggregateNode node, LocalExecutionPlanContext context) {
+ return super.visitRowBasedSeriesAggregate(node, context);
+ }
+
+ @Override
+ public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
+ return super.visitSort(node, context);
+ }
+
+ @Override
+ public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
+ return super.visitTimeJoin(node, context);
+ }
+ }
+
+ private static class LocalExecutionPlanContext {
+ private final FragmentInstanceContext taskContext;
+ private int nextOperatorId = 0;
+
+ public LocalExecutionPlanContext(FragmentInstanceContext taskContext) {
+ this.taskContext = taskContext;
+ }
+
+ private int getNextOperatorId() {
+ return nextOperatorId++;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
index b99c5db..b091b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.optimzation;
+package org.apache.iotdb.db.mpp.sql.planner.optimization;
import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
public interface PlanOptimizer {
- PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
+ PlanNode optimize(PlanNode plan, QueryContext context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 9954c74..50835dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -16,17 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import java.util.List;
public class DistributedQueryPlan {
private QueryContext context;
- private PlanNode<TsBlock> rootNode;
+ private PlanNode rootNode;
private PlanFragment rootFragment;
// TODO: consider whether this field is necessary when do the implementation
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
index 03eb1dc..fc428cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.Analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 0b405b3..7674f4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -16,7 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
public class FragmentInstance {
private FragmentInstanceId id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
index a74644b..0a45b6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.Analysis;
import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
index 5094df6..666fbf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
@@ -16,11 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
/**
* LogicalQueryPlan represents a logical query plan. It stores the root node of corresponding query
@@ -28,5 +27,5 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNode;
*/
public class LogicalQueryPlan {
private QueryContext context;
- private PlanNode<TsBlock> rootNode;
+ private PlanNode rootNode;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index fc49264..0aa3ac5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
// TODO: consider whether it is necessary to make PlanFragment as a TreeNode
/** PlanFragment contains a sub-query of distributed query. */
public class PlanFragment {
private PlanFragmentId id;
- private PlanNode<TsBlock> root;
+ private PlanNode root;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
index 39f8d17..38a56f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
public class PlanFragmentId {
private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 1a3f103..3a3975f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -16,19 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
-import org.apache.iotdb.db.mpp.common.TreeNode;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** The base class of query executable operators, which is used to compose logical query plan. */
+// TODO: consider how to restrict the children type for each type of ExecOperator
+public abstract class PlanNode {
-/**
- * @author xingtanzjr The base class of query executable operators, which is used to compose logical
- * query plan. TODO: consider how to restrict the children type for each type of ExecOperator
- * TODO: consider to fix the Template type as TsBlock
- */
-public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
private PlanNodeId id;
- public PlanNode(PlanNodeId id) {
+ protected PlanNode(PlanNodeId id) {
+ requireNonNull(id, "id is null");
this.id = id;
}
+
+ public PlanNodeId getId() {
+ return id;
+ }
+
+ public abstract List<PlanNode> getChildren();
+
+ public abstract List<String> getOutputColumnNames();
+
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPlan(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index e5a69e9..f829dfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
public class PlanNodeId {
private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
index 6e70c20..919f303 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
/** A centralized PlanNodeId generator */
public class PlanNodeIdAllocator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
new file mode 100644
index 0000000..0be251d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+
+public abstract class PlanVisitor<R, C> {
+
+ public abstract R visitPlan(PlanNode node, C context);
+
+ public R visitSeriesScan(SeriesScanNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitSeriesAggregate(SeriesAggregateScanNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitDeviceMerge(DeviceMergeNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitFill(FillNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitFilter(FilterNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitFilterNull(FilterNullNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitGroupByLevel(GroupByLevelNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitLimit(LimitNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitOffset(OffsetNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitRowBasedSeriesAggregate(AggregateNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitSort(SortNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitTimeJoin(TimeJoinNode node, C context) {
+ return visitPlan(node, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index 9d7b943..9382fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -16,20 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import java.util.List;
+import java.util.Map;
/**
- * This node is used to aggregate required series by raw data. The raw data will be input as a
- * TsBlock. This node will output the series aggregated result represented by TsBlock Thus, the
- * columns in output TsBlock will be different from input TsBlock.
+ * This node is used to aggregate required series from multiple sources. The source data will be
+ * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
+ * final series aggregated result represented by TsBlock.
*/
-public class RowBasedSeriesAggregateNode extends ProcessNode {
+public class AggregateNode extends ProcessNode {
// The parameter of `group by time`
// Its value will be null if there is no `group by time` clause,
private GroupByTimeParameter groupByTimeParameter;
@@ -38,22 +41,34 @@ public class RowBasedSeriesAggregateNode extends ProcessNode {
// result TsBlock
// (Currently we only support one series in the aggregation function)
// TODO: need consider whether it is suitable the aggregation function using FunctionExpression
- private List<FunctionExpression> aggregateFuncList;
+ private Map<String, FunctionExpression> aggregateFuncMap;
- public RowBasedSeriesAggregateNode(PlanNodeId id) {
+ private final List<PlanNode> children;
+ private final List<String> columnNames;
+
+ public AggregateNode(
+ PlanNodeId id,
+ Map<String, FunctionExpression> aggregateFuncMap,
+ List<PlanNode> children,
+ List<String> columnNames) {
super(id);
+ this.aggregateFuncMap = aggregateFuncMap;
+ this.children = children;
+ this.columnNames = columnNames;
}
- public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList) {
- this(id);
- this.aggregateFuncList = aggregateFuncList;
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
}
- public RowBasedSeriesAggregateNode(
- PlanNodeId id,
- List<FunctionExpression> aggregateFuncList,
- GroupByTimeParameter groupByTimeParameter) {
- this(id, aggregateFuncList);
- this.groupByTimeParameter = groupByTimeParameter;
+ @Override
+ public List<String> getOutputColumnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitRowBasedSeriesAggregate(this, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 9269545..54a0cf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import java.util.List;
import java.util.Map;
/**
@@ -43,23 +44,42 @@ public class DeviceMergeNode extends ProcessNode {
// The without policy is able to be push down to the DeviceMergeNode because we can know whether a
// row contains
// null or not.
- private WithoutPolicy withoutPolicy;
+ private FilterNullPolicy filterNullPolicy;
// The map from deviceName to corresponding query result node responsible for that device.
// DeviceNode means the node whose output TsBlock contains the data belonged to one device.
- private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
+ private Map<String, PlanNode> childDeviceNodeMap;
+
+ private List<PlanNode> children;
+
+ private List<String> columnNames;
public DeviceMergeNode(PlanNodeId id) {
super(id);
}
- public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitDeviceMerge(this, context);
+ }
+
+ public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode> deviceNodeMap) {
this(id);
this.childDeviceNodeMap = deviceNodeMap;
this.children.addAll(deviceNodeMap.values());
}
- public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
+ public void addChildDeviceNode(String deviceName, PlanNode childNode) {
this.childDeviceNodeMap.put(deviceName, childNode);
this.children.add(childNode);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 31e57cd..af88895 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -16,14 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.common.FillPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
/** FillNode is used to fill the empty field in one row. */
public class FillNode extends ProcessNode {
+ private PlanNode child;
+
// The policy to discard the result from upstream node
private FillPolicy fillPolicy;
@@ -31,6 +39,21 @@ public class FillNode extends ProcessNode {
super(id);
}
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFill(this, context);
+ }
+
public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
this(id);
this.fillPolicy = fillPolicy;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
new file mode 100644
index 0000000..4504890
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
+public class FilterNode extends ProcessNode {
+
+ private final PlanNode child;
+ // TODO we need to rename it to something like expression in order to distinguish from Operator
+ // class
+ private final FilterOperator predicate;
+
+ public FilterNode(PlanNodeId id, PlanNode child, FilterOperator predicate) {
+ super(id);
+ this.child = child;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFilter(this, context);
+ }
+
+ public FilterOperator getPredicate() {
+ return predicate;
+ }
+
+ public PlanNode getChild() {
+ return child;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
new file mode 100644
index 0000000..fbf8cc9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/** WithoutNode is used to discard specific rows from upstream node. */
+public class FilterNullNode extends ProcessNode {
+
+ // The policy to discard the result from upstream operator
+ private FilterNullPolicy discardPolicy;
+
+ private PlanNode child;
+
+ private List<String> filterNullColumnNames;
+
+ public FilterNullNode(PlanNodeId id, PlanNode child) {
+ super(id);
+ this.child = child;
+ }
+
+ public FilterNullNode(PlanNodeId id, PlanNode child, List<String> filterNullColumnNames) {
+ super(id);
+ this.child = child;
+ this.filterNullColumnNames = filterNullColumnNames;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFilterNull(this, context);
+ }
+
+ public void setFilterNullColumnNames(List<String> filterNullColumnNames) {
+ this.filterNullColumnNames = filterNullColumnNames;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index 538d6d8..7dcca76 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -16,9 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
/**
* This node is responsible for the final aggregation merge operation. It will process the data from
@@ -32,10 +36,48 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
*/
public class GroupByLevelNode extends ProcessNode {
+ private PlanNode child;
+
private int[] groupByLevels;
- public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
+ private List<String> columnNames;
+
+ public GroupByLevelNode(
+ PlanNodeId id, PlanNode child, int[] groupByLevels, List<String> columnNames) {
super(id);
+ this.child = child;
+ this.groupByLevels = groupByLevels;
+ this.columnNames = columnNames;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return child.getChildren();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitGroupByLevel(this, context);
+ }
+
+ public int[] getGroupByLevels() {
+ return groupByLevels;
+ }
+
+ public void setGroupByLevels(int[] groupByLevels) {
this.groupByLevels = groupByLevels;
}
+
+ public List<String> getColumnNames() {
+ return columnNames;
+ }
+
+ public void setColumnNames(List<String> columnNames) {
+ this.columnNames = columnNames;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 9596c1a..8fdc9b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -16,22 +16,49 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
public class LimitNode extends ProcessNode {
// The limit count
- private int limit;
+ private final int limit;
+ private final PlanNode child;
- public LimitNode(PlanNodeId id) {
+ public LimitNode(PlanNodeId id, int limit, PlanNode child) {
super(id);
+ this.limit = limit;
+ this.child = child;
}
- public LimitNode(PlanNodeId id, int limit) {
- this(id);
- this.limit = limit;
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLimit(this, context);
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public PlanNode getChild() {
+ return child;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 01e0e93..2e3fc78 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -16,9 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
/**
* OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
@@ -27,14 +31,35 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
public class OffsetNode extends ProcessNode {
// The limit count
- private int offset;
+ private final PlanNode child;
+ private final int offset;
- public OffsetNode(PlanNodeId id) {
+ public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
super(id);
+ this.child = child;
+ this.offset = offset;
}
- public OffsetNode(PlanNodeId id, int offset) {
- this(id);
- this.offset = offset;
+ @Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitOffset(this, context);
+ }
+
+ public PlanNode getChild() {
+ return child;
+ }
+
+ public int getOffset() {
+ return offset;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
index 63e07b4..9c1fec5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+public abstract class ProcessNode extends PlanNode {
-public class ProcessNode extends PlanNode<TsBlock> {
public ProcessNode(PlanNodeId id) {
super(id);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 1e83783..19464a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -16,10 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
/**
* In general, the parameter in sortNode should be pushed down to the upstream operators. In our
@@ -27,14 +33,31 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
*/
public class SortNode extends ProcessNode {
+ private final PlanNode child;
+
+ private final List<String> orderBy;
+
private OrderBy sortOrder;
- public SortNode(PlanNodeId id) {
+ public SortNode(PlanNodeId id, PlanNode child, List<String> orderBy, OrderBy sortOrder) {
super(id);
+ this.child = child;
+ this.orderBy = orderBy;
+ this.sortOrder = sortOrder;
}
- public SortNode(PlanNodeId id, OrderBy sortOrder) {
- this(id);
- this.sortOrder = sortOrder;
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSort(this, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index ab48cc4..74fac14 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -16,15 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* TimeJoinOperator is responsible for join two or more TsBlock. The join algorithm is like outer
@@ -41,27 +42,51 @@ public class TimeJoinNode extends ProcessNode {
// The without policy is able to be push down to the TimeJoinOperator because we can know whether
// a row contains
// null or not.
- private WithoutPolicy withoutPolicy;
+ private FilterNullPolicy filterNullPolicy;
- public TimeJoinNode(PlanNodeId id) {
+ private List<PlanNode> children;
+
+ public TimeJoinNode(
+ PlanNodeId id,
+ OrderBy mergeOrder,
+ FilterNullPolicy filterNullPolicy,
+ List<PlanNode> children) {
super(id);
- this.mergeOrder = OrderBy.TIMESTAMP_ASC;
+ this.mergeOrder = mergeOrder;
+ this.filterNullPolicy = filterNullPolicy;
+ this.children = children;
}
- public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
- super(id);
- this.children.addAll(Arrays.asList(children));
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return children.stream()
+ .flatMap(child -> child.getOutputColumnNames().stream())
+ .collect(Collectors.toList());
}
- public void addChild(PlanNode<TsBlock> child) {
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitTimeJoin(this, context);
+ }
+
+ public void addChild(PlanNode child) {
this.children.add(child);
}
+ public void setChildren(List<PlanNode> children) {
+ this.children = children;
+ }
+
public void setMergeOrder(OrderBy mergeOrder) {
this.mergeOrder = mergeOrder;
}
- public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
- this.withoutPolicy = withoutPolicy;
+ public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
+ this.filterNullPolicy = filterNullPolicy;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
index 0bac6bf..2fddbdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
public class CsvSinkNode extends SinkNode {
public CsvSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class CsvSinkNode extends SinkNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void close() throws Exception {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 58c3a71..da30223 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
public class FragmentSinkNode extends SinkNode {
public FragmentSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class FragmentSinkNode extends SinkNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void send() {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
index f59effb..e01a2c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SinkNode extends PlanNode implements AutoCloseable {
public SinkNode(PlanNodeId id) {
super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
index f5c48df..bb343f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
/** not implemented in current IoTDB yet */
public class ThriftSinkNode extends SinkNode {
@@ -28,6 +31,16 @@ public class ThriftSinkNode extends SinkNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void close() throws Exception {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index a2a0fde..612e930 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
/** Not implemented in current version. */
public class CsvSourceNode extends SourceNode {
@@ -28,6 +31,16 @@ public class CsvSourceNode extends SourceNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void close() throws Exception {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 80ea58f..ee01ac1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -16,13 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
/**
* SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will
* read the target series and calculate the aggregation result by the aggregation digest or raw data
@@ -38,7 +44,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
* represent the whole aggregation result of this series. And the timestamp will be 0, which is
* meaningless.
*/
-public class SeriesAggregateNode extends SourceNode {
+public class SeriesAggregateScanNode extends SourceNode {
// The parameter of `group by time`
// Its value will be null if there is no `group by time` clause,
@@ -51,16 +57,28 @@ public class SeriesAggregateNode extends SourceNode {
private Filter filter;
- public SeriesAggregateNode(PlanNodeId id) {
+ private String columnName;
+
+ public SeriesAggregateScanNode(PlanNodeId id) {
super(id);
}
- public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return ImmutableList.of(columnName);
+ }
+
+ public SeriesAggregateScanNode(PlanNodeId id, FunctionExpression aggregateFunc) {
this(id);
this.aggregateFunc = aggregateFunc;
}
- public SeriesAggregateNode(
+ public SeriesAggregateScanNode(
PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
this(id, aggregateFunc);
this.groupByTimeParameter = groupByTimeParameter;
@@ -72,6 +90,11 @@ public class SeriesAggregateNode extends SourceNode {
@Override
public void close() throws Exception {}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSeriesAggregate(this, context);
+ }
+
// This method is used when do the PredicatePushDown.
// The filter is not put in the constructor because the filter is only clear in the predicate
// push-down stage
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index ccfae5c..8858074 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -16,13 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
/**
* SeriesScanOperator is responsible for read data a specific series. When reading data, the
* SeriesScanOperator can read the raw data batch by batch. And also, it can leverage the filter and
@@ -52,6 +58,8 @@ public class SeriesScanNode extends SourceNode {
// offset for result set. The default value is 0
private int offset;
+ private String columnName;
+
public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
@@ -82,4 +90,19 @@ public class SeriesScanNode extends SourceNode {
public void setOffset(int offset) {
this.offset = offset;
}
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return ImmutableList.of(columnName);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSeriesScan(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index c83da97..551e9d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SourceNode extends PlanNode implements AutoCloseable {
public SourceNode(PlanNodeId id) {
super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
index 7a4107e..a75d326 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
@@ -16,9 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.tree;
-public enum WithoutPolicy {
- CONTAINS_NULL,
- ALL_NULL
-}
+public class Expression {}