You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/18 10:38:14 UTC
[iotdb] branch mpp-ty updated: partial operator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch mpp-ty
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/mpp-ty by this push:
new b877352 partial operator
b877352 is described below
commit b8773527c817b2442142dc5bfb51138523c5cbff
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Mar 18 18:37:23 2022 +0800
partial operator
---
.../{WithoutPolicy.java => FilterNullPolicy.java} | 2 +-
.../common/{WithoutPolicy.java => InstanceId.java} | 10 +-
.../iotdb/db/mpp/execution/InstanceContext.java | 44 +++++++
.../iotdb/db/mpp/execution/InstanceState.java | 77 +++++++++++++
.../iotdb/db/mpp/execution/QueryExecution.java | 4 +-
.../iotdb/db/mpp/execution/QueryScheduler.java | 2 +-
.../executor/InstanceExecutor.java} | 28 ++---
.../executor/InstanceHandle.java} | 19 ++--
.../iotdb/db/mpp/operator/OperatorContext.java | 2 +-
.../process/AggregateOperator.java} | 37 ++++--
.../process/DeviceMergeOperator.java} | 36 ++++--
.../process/FillOperator.java} | 36 ++++--
.../process/FilterNullOperator.java} | 37 ++++--
.../process/GroupByLevelOperator.java} | 37 ++++--
.../process/LimitOperator.java} | 36 ++++--
.../process/OffsetOperator.java} | 37 ++++--
.../process/ProcessOperator.java} | 11 +-
.../process/SortOperator.java} | 37 ++++--
.../process/TimeJoinOperator.java} | 37 ++++--
.../sink/SinkOperator.java} | 34 +++---
.../source/SeriesAggregateScanOperator.java} | 42 +++++--
.../source/SeriesScanOperator.java} | 43 +++++--
.../source/SourceOperator.java} | 12 +-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 126 +++++++++++++++++++++
.../planner/optimization}/PlanOptimizer.java | 4 +-
.../planner}/plan/DistributedQueryPlan.java | 6 +-
.../planner}/plan/DistributionPlanner.java | 2 +-
.../{ => sql/planner}/plan/FragmentInstance.java | 2 +-
.../{ => sql/planner}/plan/FragmentInstanceId.java | 2 +-
.../mpp/{ => sql/planner}/plan/LogicalPlanner.java | 4 +-
.../{ => sql/planner}/plan/LogicalQueryPlan.java | 6 +-
.../mpp/{ => sql/planner}/plan/PlanFragment.java | 6 +-
.../mpp/{ => sql/planner}/plan/PlanFragmentId.java | 2 +-
.../mpp/{ => sql/planner}/plan/node/PlanNode.java | 30 +++--
.../{ => sql/planner}/plan/node/PlanNodeId.java | 2 +-
.../planner}/plan/node/PlanNodeIdAllocator.java | 2 +-
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 76 +++++++++++++
.../planner/plan/node/process/AggregateNode.java} | 47 +++++---
.../plan/node/process/DeviceMergeNode.java | 38 +++++--
.../planner}/plan/node/process/FillNode.java | 26 ++++-
.../sql/planner/plan/node/process/FilterNode.java | 64 +++++++++++
.../planner/plan/node/process/FilterNullNode.java | 70 ++++++++++++
.../plan/node/process/GroupByLevelNode.java | 47 +++++++-
.../planner}/plan/node/process/LimitNode.java | 40 +++++--
.../planner}/plan/node/process/OffsetNode.java | 39 +++++--
.../planner}/plan/node/process/ProcessNode.java | 9 +-
.../planner}/plan/node/process/SortNode.java | 34 +++++-
.../planner}/plan/node/process/TimeJoinNode.java | 50 +++++---
.../planner}/plan/node/sink/CsvSinkNode.java | 17 ++-
.../planner}/plan/node/sink/FragmentSinkNode.java | 17 ++-
.../{ => sql/planner}/plan/node/sink/SinkNode.java | 8 +-
.../planner}/plan/node/sink/ThriftSinkNode.java | 17 ++-
.../planner}/plan/node/source/CsvSourceNode.java | 17 ++-
.../plan/node/source/SeriesAggregateScanNode.java} | 34 +++++-
.../planner}/plan/node/source/SeriesScanNode.java | 26 ++++-
.../planner}/plan/node/source/SourceNode.java | 8 +-
.../tree/Expression.java} | 7 +-
57 files changed, 1253 insertions(+), 292 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
similarity index 96%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
index 7a4107e..7979b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.common;
-public enum WithoutPolicy {
+public enum FilterNullPolicy {
CONTAINS_NULL,
ALL_NULL
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/InstanceId.java
similarity index 86%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/InstanceId.java
index 7a4107e..a3dd91f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/InstanceId.java
@@ -18,7 +18,11 @@
*/
package org.apache.iotdb.db.mpp.common;
-public enum WithoutPolicy {
- CONTAINS_NULL,
- ALL_NULL
+public class InstanceId {
+
+ private final String fullId;
+
+ public InstanceId(String fullId) {
+ this.fullId = fullId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceContext.java
new file mode 100644
index 0000000..b6d27f5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.InstanceId;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class InstanceContext {
+
+ private InstanceId id;
+
+ private final long createNanos = System.nanoTime();
+
+// private final GcMonitor gcMonitor;
+// private final AtomicLong startNanos = new AtomicLong();
+// private final AtomicLong startFullGcCount = new AtomicLong(-1);
+// private final AtomicLong startFullGcTimeNanos = new AtomicLong(-1);
+// private final AtomicLong endNanos = new AtomicLong();
+// private final AtomicLong endFullGcCount = new AtomicLong(-1);
+// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
+
+
+ public InstanceContext(InstanceId id) {
+ this.id = id;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceState.java
new file mode 100644
index 0000000..847a6e7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum InstanceState {
+ /**
+ * Instance is planned but has not been scheduled yet. An instance will
+ * be in the planned state until, the dependencies of the instance
+ * have begun producing output.
+ */
+ PLANNED(false),
+ /**
+ * Instance is running.
+ */
+ RUNNING(false),
+ /**
+ * Instance has finished executing and output is left to be consumed.
+ * In this state, there will be no new drivers, the existing drivers have finished
+ * and the output buffer of the instance is at-least in a 'no-more-tsBlocks' state.
+ */
+ FLUSHING(false),
+ /**
+ * Instance has finished executing and all output has been consumed.
+ */
+ FINISHED(true),
+ /**
+ * Instance was canceled by a user.
+ */
+ CANCELED(true),
+ /**
+ * Instance was aborted due to a failure in the query. The failure
+ * was not in this instance.
+ */
+ ABORTED(true),
+ /**
+ * Instance execution failed.
+ */
+ FAILED(true);
+
+ public static final Set<InstanceState> TERMINAL_TASK_STATES = Stream.of(InstanceState.values()).filter(InstanceState::isDone).collect(toImmutableSet());
+
+ private final boolean doneState;
+
+ InstanceState(boolean doneState)
+ {
+ this.doneState = doneState;
+ }
+
+ /**
+ * Is this a terminal state.
+ */
+ public boolean isDone()
+ {
+ return doneState;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 2b9d8cf..9852350 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.mpp.common.Analysis;
import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.*;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
import java.nio.ByteBuffer;
import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
index fc5df30..d215c7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.db.mpp.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceExecutor.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceExecutor.java
index a4cb88c..8e819ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceExecutor.java
@@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.execution.executor;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
-/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode {
- // The filter
- private FilterOperator rowFilter;
+public class InstanceExecutor {
- public FilterNode(PlanNodeId id) {
- super(id);
- }
+ /**
+ * TODO native implementation, should be replaced later
+ * @param instance executable fragment instance
+ * @param handle instance handle
+ * @return ListenableFuture indicate the instance's end state
+ */
+ public ListenableFuture<Void> enqueueInstance(ExecFragmentInstance instance, InstanceHandle handle) {
+ return SettableFuture.create();
+ }
- public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
- this(id);
- this.rowFilter = rowFilter;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceHandle.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceHandle.java
index 58c3a71..08444e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceHandle.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.execution.executor;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.common.InstanceId;
-public class FragmentSinkNode extends SinkNode {
- public FragmentSinkNode(PlanNodeId id) {
- super(id);
- }
- @Override
- public void send() {}
+// TODO should contain more fields, add as you want
+public class InstanceHandle {
- @Override
- public void close() throws Exception {}
+ private final InstanceId taskId;
+
+ public InstanceHandle(InstanceId taskId) {
+ this.taskId = taskId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
index c635f74..472fa05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
/**
* Contains information about {@link Operator} execution.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index 9954c74..9604721 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -16,19 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class AggregateOperator implements ProcessOperator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index 9954c74..dab4df5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -16,19 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class DeviceMergeOperator implements ProcessOperator {
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index 9954c74..3b35646 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -16,19 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class FillOperator implements ProcessOperator {
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index 9954c74..f0a707c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -16,19 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class FilterNullOperator implements ProcessOperator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 9954c74..435e016 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -16,19 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class GroupByLevelOperator implements ProcessOperator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index 9954c74..60d68ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -16,19 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class LimitOperator implements ProcessOperator {
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index 9954c74..08f971b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -16,19 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class OffsetOperator implements ProcessOperator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
similarity index 78%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
index 39f8d17..320e505 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-public class PlanFragmentId {
- private String id;
+import org.apache.iotdb.db.mpp.operator.Operator;
+
+// TODO should think about what interfaces should this ProcessOperator have
+public interface ProcessOperator extends Operator {
- public PlanFragmentId(String id) {
- this.id = id;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index 9954c74..45b5da4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -16,19 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class SortOperator implements ProcessOperator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 9954c74..0c95752 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -16,19 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import java.util.List;
+public class TimeJoinOperator implements ProcessOperator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return ProcessOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
index d7c463d..7f75c4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
@@ -16,23 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.sink;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.Operator;
-/** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode {
+import java.nio.ByteBuffer;
- // The policy to discard the result from upstream operator
- private WithoutPolicy discardPolicy;
+public interface SinkOperator extends Operator {
- public FilterNullNode(PlanNodeId id) {
- super(id);
- }
+ /**
+ * Sends a tsBlock to an unpartitioned buffer. If no-more-tsBlocks has been set, the send tsBlock
+ * call is ignored. This can happen with limit queries.
+ */
+ void send(ByteBuffer tsBlock);
- public FilterNullNode(PlanNodeId id, WithoutPolicy discardPolicy) {
- this(id);
- this.discardPolicy = discardPolicy;
- }
+ /**
+ * Notify SinkHandle that no more tsBlocks will be sent. Any future calls to send a tsBlock are
+ * ignored.
+ */
+ void setNoMoreTsBlocks();
+
+ /**
+ * Abort the sink handle, discarding all tsBlocks which may still in memory buffer, but blocking
+ * readers. It is expected that readers will be unblocked when the failed query is cleaned up.
+ */
+ void abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 9954c74..83c6309 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -16,19 +16,41 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.source;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import java.util.List;
+public class SeriesAggregateScanOperator implements SourceOperator {
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return SourceOperator.super.isBlocked();
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ SourceOperator.super.close();
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 9954c74..42085ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -16,19 +16,42 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.source;
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import java.util.List;
+public class SeriesScanOperator implements SourceOperator {
-public class DistributedQueryPlan {
- private QueryContext context;
- private PlanNode<TsBlock> rootNode;
- private PlanFragment rootFragment;
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return SourceOperator.super.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ SourceOperator.super.close();
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
similarity index 76%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
index 39f8d17..63b9acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.source;
-public class PlanFragmentId {
- private String id;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
- public PlanFragmentId(String id) {
- this.id = id;
- }
+public interface SourceOperator extends Operator {
+
+ PlanNodeId getSourceId();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
new file mode 100644
index 0000000..920c52f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.mpp.execution.InstanceContext;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/**
+ * used to plan a fragment instance. Currently, we simply change it from PlanNode to executable Operator tree,
+ * but in the future, we may split one fragment instance into multiple pipeline to run a fragment instance parallel and take full advantage of multi-cores
+ */
+public class LocalExecutionPlanner {
+
+
+ /**
+ * This Visitor is responsible for transferring PlanNode Tree to Operator Tree
+ */
+ private class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
+
+ @Override
+ public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) {
+ throw new UnsupportedOperationException("should call the concrete visitXX() method");
+ }
+
+ @Override
+ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) {
+ return super.visitSeriesScan(node, context);
+ }
+
+ @Override
+ public Operator visitSeriesAggregate(SeriesAggregateScanNode node, LocalExecutionPlanContext context) {
+ return super.visitSeriesAggregate(node, context);
+ }
+
+ @Override
+ public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
+ return super.visitDeviceMerge(node, context);
+ }
+
+ @Override
+ public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
+ return super.visitFill(node, context);
+ }
+
+ @Override
+ public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
+ PlanNode child = node.getChild();
+
+ FilterOperator filterExpression = node.getPredicate();
+ List<String> outputSymbols = node.getOutputColumnNames();
+ return super.visitFilter(node, context);
+ }
+
+ @Override
+ public Operator visitFilterNull(FilterNullNode node, LocalExecutionPlanContext context) {
+ return super.visitFilterNull(node, context);
+ }
+
+ @Override
+ public Operator visitGroupByLevel(GroupByLevelNode node, LocalExecutionPlanContext context) {
+ return super.visitGroupByLevel(node, context);
+ }
+
+ @Override
+ public Operator visitLimit(LimitNode node, LocalExecutionPlanContext context) {
+ return super.visitLimit(node, context);
+ }
+
+ @Override
+ public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext context) {
+ return super.visitOffset(node, context);
+ }
+
+ @Override
+ public Operator visitRowBasedSeriesAggregate(AggregateNode node, LocalExecutionPlanContext context) {
+ return super.visitRowBasedSeriesAggregate(node, context);
+ }
+
+ @Override
+ public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
+ return super.visitSort(node, context);
+ }
+
+ @Override
+ public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
+ return super.visitTimeJoin(node, context);
+ }
+ }
+
+ private static class LocalExecutionPlanContext {
+ private final InstanceContext taskContext;
+ private int nextOperatorId = 0;
+
+ public LocalExecutionPlanContext(InstanceContext taskContext) {
+ this.taskContext = taskContext;
+ }
+
+ private int getNextOperatorId() {
+ return nextOperatorId++;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
index b99c5db..f98f014 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.optimzation;
+package org.apache.iotdb.db.mpp.sql.planner.optimization;
import org.apache.iotdb.db.mpp.common.QueryContext;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
public interface PlanOptimizer {
PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 9954c74..96185bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -16,17 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.QueryContext;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import java.util.List;
public class DistributedQueryPlan {
private QueryContext context;
- private PlanNode<TsBlock> rootNode;
+ private PlanNode rootNode;
private PlanFragment rootFragment;
// TODO: consider whether this field is necessary when do the implementation
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
index 03eb1dc..fc428cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.Analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 0b405b3..dfdd764 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
public class FragmentInstance {
private FragmentInstanceId id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstanceId.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstanceId.java
index 18181cd..c47d549 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstanceId.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
public class FragmentInstanceId {
private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
index a74644b..0a45b6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.Analysis;
import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
index 5094df6..f3dce2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.QueryContext;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
/**
* LogicalQueryPlan represents a logical query plan. It stores the root node of corresponding query
@@ -28,5 +28,5 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNode;
*/
public class LogicalQueryPlan {
private QueryContext context;
- private PlanNode<TsBlock> rootNode;
+ private PlanNode rootNode;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index fc49264..bf13247 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
// TODO: consider whether it is necessary to make PlanFragment as a TreeNode
/** PlanFragment contains a sub-query of distributed query. */
public class PlanFragment {
private PlanFragmentId id;
- private PlanNode<TsBlock> root;
+ private PlanNode root;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
index 39f8d17..38a56f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
public class PlanFragmentId {
private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 1a3f103..583d4a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -16,19 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
-import org.apache.iotdb.db.mpp.common.TreeNode;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
/**
- * @author xingtanzjr The base class of query executable operators, which is used to compose logical
- * query plan. TODO: consider how to restrict the children type for each type of ExecOperator
- * TODO: consider to fix the Template type as TsBlock
+ * The base class of query executable operators, which is used to compose logical query plan.
*/
-public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
+// TODO: consider how to restrict the children type for each type of ExecOperator
+public abstract class PlanNode {
+
private PlanNodeId id;
- public PlanNode(PlanNodeId id) {
+ protected PlanNode(PlanNodeId id) {
+ requireNonNull(id, "id is null");
this.id = id;
}
+
+ public PlanNodeId getId() {
+ return id;
+ }
+
+ public abstract List<PlanNode> getChildren();
+
+ public abstract List<String> getOutputColumnNames();
+
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPlan(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index 576fd64..426c778 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
public class PlanNodeId {
private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
index 6e70c20..919f303 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
/** A centralized PlanNodeId generator */
public class PlanNodeIdAllocator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
new file mode 100644
index 0000000..ecbc043
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+
+public abstract class PlanVisitor<R, C> {
+
+ public abstract R visitPlan(PlanNode node, C context);
+
+ public R visitSeriesScan(SeriesScanNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitSeriesAggregate(SeriesAggregateScanNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitDeviceMerge(DeviceMergeNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitFill(FillNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitFilter(FilterNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitFilterNull(FilterNullNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitGroupByLevel(GroupByLevelNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitLimit(LimitNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitOffset(OffsetNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitRowBasedSeriesAggregate(AggregateNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitSort(SortNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitTimeJoin(TimeJoinNode node, C context) {
+ return visitPlan(node, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index 9d7b943..4588ea4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -16,20 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import java.util.List;
+import java.util.Map;
/**
- * This node is used to aggregate required series by raw data. The raw data will be input as a
- * TsBlock. This node will output the series aggregated result represented by TsBlock Thus, the
- * columns in output TsBlock will be different from input TsBlock.
+ * This node is used to aggregate required series from multiple sources. The source data will be input as a TsBlock,
+ * it may be raw data or partial aggregation result.
+ * This node will output the final series aggregated result represented by TsBlock.
*/
-public class RowBasedSeriesAggregateNode extends ProcessNode {
+public class AggregateNode extends ProcessNode {
// The parameter of `group by time`
// Its value will be null if there is no `group by time` clause,
private GroupByTimeParameter groupByTimeParameter;
@@ -38,22 +41,32 @@ public class RowBasedSeriesAggregateNode extends ProcessNode {
// result TsBlock
// (Currently we only support one series in the aggregation function)
// TODO: need consider whether it is suitable the aggregation function using FunctionExpression
- private List<FunctionExpression> aggregateFuncList;
+ private Map<String, FunctionExpression> aggregateFuncMap;
- public RowBasedSeriesAggregateNode(PlanNodeId id) {
+ private final List<PlanNode> children;
+ private final List<String> columnNames;
+
+ public AggregateNode(PlanNodeId id, Map<String, FunctionExpression> aggregateFuncMap, List<PlanNode> children, List<String> columnNames) {
super(id);
+ this.aggregateFuncMap = aggregateFuncMap;
+ this.children = children;
+ this.columnNames = columnNames;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
}
- public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList) {
- this(id);
- this.aggregateFuncList = aggregateFuncList;
+ @Override
+ public List<String> getOutputColumnNames() {
+ return columnNames;
}
- public RowBasedSeriesAggregateNode(
- PlanNodeId id,
- List<FunctionExpression> aggregateFuncList,
- GroupByTimeParameter groupByTimeParameter) {
- this(id, aggregateFuncList);
- this.groupByTimeParameter = groupByTimeParameter;
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitRowBasedSeriesAggregate(this, context);
}
+
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 9269545..1f61e88 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import java.util.List;
import java.util.Map;
/**
@@ -43,23 +44,42 @@ public class DeviceMergeNode extends ProcessNode {
// The without policy is able to be push down to the DeviceMergeNode because we can know whether a
// row contains
// null or not.
- private WithoutPolicy withoutPolicy;
+ private FilterNullPolicy filterNullPolicy;
// The map from deviceName to corresponding query result node responsible for that device.
// DeviceNode means the node whose output TsBlock contains the data belonged to one device.
- private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
+ private Map<String, PlanNode> childDeviceNodeMap;
+
+ private List<PlanNode> children;
+
+ private List<String> columnNames;
public DeviceMergeNode(PlanNodeId id) {
super(id);
}
- public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitDeviceMerge(this, context);
+ }
+
+ public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode> deviceNodeMap) {
this(id);
this.childDeviceNodeMap = deviceNodeMap;
this.children.addAll(deviceNodeMap.values());
}
- public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
+ public void addChildDeviceNode(String deviceName, PlanNode childNode) {
this.childDeviceNodeMap.put(deviceName, childNode);
this.children.add(childNode);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 31e57cd..29396c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -16,14 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import com.google.common.collect.ImmutableList;
import org.apache.iotdb.db.mpp.common.FillPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
/** FillNode is used to fill the empty field in one row. */
public class FillNode extends ProcessNode {
+ private PlanNode child;
+
// The policy to discard the result from upstream node
private FillPolicy fillPolicy;
@@ -31,6 +38,21 @@ public class FillNode extends ProcessNode {
super(id);
}
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFill(this, context);
+ }
+
public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
this(id);
this.fillPolicy = fillPolicy;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
new file mode 100644
index 0000000..a6108fe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
+public class FilterNode extends ProcessNode {
+
+ private final PlanNode child;
+ // TODO we need to rename it to something like expression in order to distinguish from Operator class
+ private final FilterOperator predicate;
+
+ public FilterNode(PlanNodeId id, PlanNode child, FilterOperator predicate) {
+ super(id);
+ this.child = child;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFilter(this, context);
+ }
+
+ public FilterOperator getPredicate() {
+ return predicate;
+ }
+
+ public PlanNode getChild() {
+ return child;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
new file mode 100644
index 0000000..976af21
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/** WithoutNode is used to discard specific rows from upstream node. */
+public class FilterNullNode extends ProcessNode {
+
+ // The policy to discard the result from upstream operator
+ private FilterNullPolicy discardPolicy;
+
+ private PlanNode child;
+
+ private List<String> filterNullColumnNames;
+
+
+ public FilterNullNode(PlanNodeId id, PlanNode child) {
+ super(id);
+ this.child = child;
+ }
+
+ public FilterNullNode(PlanNodeId id, PlanNode child, List<String> filterNullColumnNames) {
+ super(id);
+ this.child = child;
+ this.filterNullColumnNames = filterNullColumnNames;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitFilterNull(this, context);
+ }
+
+ public void setFilterNullColumnNames(List<String> filterNullColumnNames) {
+ this.filterNullColumnNames = filterNullColumnNames;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index 538d6d8..b877933 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -16,9 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
/**
* This node is responsible for the final aggregation merge operation. It will process the data from
@@ -32,10 +36,47 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
*/
public class GroupByLevelNode extends ProcessNode {
+ private PlanNode child;
+
private int[] groupByLevels;
- public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
+ private List<String> columnNames;
+
+ public GroupByLevelNode(PlanNodeId id, PlanNode child, int[] groupByLevels, List<String> columnNames) {
super(id);
+ this.child = child;
+ this.groupByLevels = groupByLevels;
+ this.columnNames = columnNames;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return child.getChildren();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitGroupByLevel(this, context);
+ }
+
+ public int[] getGroupByLevels() {
+ return groupByLevels;
+ }
+
+ public void setGroupByLevels(int[] groupByLevels) {
this.groupByLevels = groupByLevels;
}
+
+ public List<String> getColumnNames() {
+ return columnNames;
+ }
+
+ public void setColumnNames(List<String> columnNames) {
+ this.columnNames = columnNames;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 9596c1a..67b7203 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -16,22 +16,48 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
public class LimitNode extends ProcessNode {
// The limit count
- private int limit;
+ private final int limit;
+ private final PlanNode child;
- public LimitNode(PlanNodeId id) {
+ public LimitNode(PlanNodeId id, int limit, PlanNode child) {
super(id);
+ this.limit = limit;
+ this.child = child;
}
- public LimitNode(PlanNodeId id, int limit) {
- this(id);
- this.limit = limit;
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLimit(this, context);
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public PlanNode getChild() {
+ return child;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 01e0e93..2e3fc78 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -16,9 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
/**
* OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
@@ -27,14 +31,35 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
public class OffsetNode extends ProcessNode {
// The limit count
- private int offset;
+ private final PlanNode child;
+ private final int offset;
- public OffsetNode(PlanNodeId id) {
+ public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
super(id);
+ this.child = child;
+ this.offset = offset;
}
- public OffsetNode(PlanNodeId id, int offset) {
- this(id);
- this.offset = offset;
+ @Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitOffset(this, context);
+ }
+
+ public PlanNode getChild() {
+ return child;
+ }
+
+ public int getOffset() {
+ return offset;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
index 63e07b4..8348a21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
@@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+public abstract class ProcessNode extends PlanNode {
-public class ProcessNode extends PlanNode<TsBlock> {
public ProcessNode(PlanNodeId id) {
super(id);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 1e83783..49b59a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -16,10 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import com.google.common.collect.ImmutableList;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
/**
* In general, the parameter in sortNode should be pushed down to the upstream operators. In our
@@ -27,14 +32,31 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
*/
public class SortNode extends ProcessNode {
+ private final PlanNode child;
+
+ private final List<String> orderBy;
+
private OrderBy sortOrder;
- public SortNode(PlanNodeId id) {
+ public SortNode(PlanNodeId id, PlanNode child, List<String> orderBy, OrderBy sortOrder) {
super(id);
+ this.child = child;
+ this.orderBy = orderBy;
+ this.sortOrder = sortOrder;
}
- public SortNode(PlanNodeId id, OrderBy sortOrder) {
- this(id);
- this.sortOrder = sortOrder;
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSort(this, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index ab48cc4..998594c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -16,15 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* TimeJoinOperator is responsible for join two or more TsBlock. The join algorithm is like outer
@@ -41,27 +42,46 @@ public class TimeJoinNode extends ProcessNode {
// The without policy is able to be push down to the TimeJoinOperator because we can know whether
// a row contains
// null or not.
- private WithoutPolicy withoutPolicy;
+ private FilterNullPolicy filterNullPolicy;
- public TimeJoinNode(PlanNodeId id) {
+ private List<PlanNode> children;
+
+
+ public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, FilterNullPolicy filterNullPolicy, List<PlanNode> children) {
super(id);
- this.mergeOrder = OrderBy.TIMESTAMP_ASC;
+ this.mergeOrder = mergeOrder;
+ this.filterNullPolicy = filterNullPolicy;
+ this.children = children;
}
- public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
- super(id);
- this.children.addAll(Arrays.asList(children));
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return children.stream().flatMap(child -> child.getOutputColumnNames().stream()).collect(Collectors.toList());
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitTimeJoin(this, context);
}
- public void addChild(PlanNode<TsBlock> child) {
+ public void addChild(PlanNode child) {
this.children.add(child);
}
+ public void setChildren(List<PlanNode> children) {
+ this.children = children;
+ }
+
public void setMergeOrder(OrderBy mergeOrder) {
this.mergeOrder = mergeOrder;
}
- public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
- this.withoutPolicy = withoutPolicy;
+ public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
+ this.filterNullPolicy = filterNullPolicy;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
index 0bac6bf..2fddbdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
public class CsvSinkNode extends SinkNode {
public CsvSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class CsvSinkNode extends SinkNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void close() throws Exception {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 58c3a71..da30223 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
public class FragmentSinkNode extends SinkNode {
public FragmentSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class FragmentSinkNode extends SinkNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void send() {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
index f59effb..7221fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SinkNode extends PlanNode implements AutoCloseable {
public SinkNode(PlanNodeId id) {
super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
index f5c48df..bb343f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
/** not implemented in current IoTDB yet */
public class ThriftSinkNode extends SinkNode {
@@ -28,6 +31,16 @@ public class ThriftSinkNode extends SinkNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void close() throws Exception {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index a2a0fde..612e930 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
/** Not implemented in current version. */
public class CsvSourceNode extends SourceNode {
@@ -28,6 +31,16 @@ public class CsvSourceNode extends SourceNode {
}
@Override
+ public List<PlanNode> getChildren() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
public void close() throws Exception {}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 80ea58f..6bdf1c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -16,13 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import com.google.common.collect.ImmutableList;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import java.util.List;
+
/**
* SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will
* read the target series and calculate the aggregation result by the aggregation digest or raw data
@@ -38,7 +43,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
* represent the whole aggregation result of this series. And the timestamp will be 0, which is
* meaningless.
*/
-public class SeriesAggregateNode extends SourceNode {
+public class SeriesAggregateScanNode extends SourceNode {
// The parameter of `group by time`
// Its value will be null if there is no `group by time` clause,
@@ -51,16 +56,28 @@ public class SeriesAggregateNode extends SourceNode {
private Filter filter;
- public SeriesAggregateNode(PlanNodeId id) {
+ private String columnName;
+
+ public SeriesAggregateScanNode(PlanNodeId id) {
super(id);
}
- public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return ImmutableList.of(columnName);
+ }
+
+ public SeriesAggregateScanNode(PlanNodeId id, FunctionExpression aggregateFunc) {
this(id);
this.aggregateFunc = aggregateFunc;
}
- public SeriesAggregateNode(
+ public SeriesAggregateScanNode(
PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
this(id, aggregateFunc);
this.groupByTimeParameter = groupByTimeParameter;
@@ -72,6 +89,11 @@ public class SeriesAggregateNode extends SourceNode {
@Override
public void close() throws Exception {}
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSeriesAggregate(this, context);
+ }
+
// This method is used when do the PredicatePushDown.
// The filter is not put in the constructor because the filter is only clear in the predicate
// push-down stage
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index ccfae5c..aa02fbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -16,13 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import com.google.common.collect.ImmutableList;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import java.util.List;
+
/**
* SeriesScanOperator is responsible for read data a specific series. When reading data, the
* SeriesScanOperator can read the raw data batch by batch. And also, it can leverage the filter and
@@ -52,6 +57,8 @@ public class SeriesScanNode extends SourceNode {
// offset for result set. The default value is 0
private int offset;
+ private String columnName;
+
public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
@@ -82,4 +89,19 @@ public class SeriesScanNode extends SourceNode {
public void setOffset(int offset) {
this.offset = offset;
}
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return ImmutableList.of(columnName);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSeriesScan(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index c83da97..ee25e5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SourceNode extends PlanNode implements AutoCloseable {
public SourceNode(PlanNodeId id) {
super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
index 7a4107e..45decc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
@@ -16,9 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.tree;
+
+public class Expression {
-public enum WithoutPolicy {
- CONTAINS_NULL,
- ALL_NULL
}