You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/24 02:47:48 UTC
[iotdb] branch master updated: [IoTDB-2260]Complete the basic version of DistributionPlanner (#5327)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bc3179b [IoTDB-2260]Complete the basic version of DistributionPlanner (#5327)
bc3179b is described below
commit bc3179b99f9296bb9c036cf7e2c56632db4b85bc
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Mar 24 10:47:01 2022 +0800
[IoTDB-2260]Complete the basic version of DistributionPlanner (#5327)
* complete SubPlan builder
* add interface
* format cofde
* add fragment parallel planner
* improve the InsertTabletNode and InsertTabletStatement
* complete basic distributed plan
* spotless
* spotless
* change name
Co-authored-by: qiaojialin <64...@qq.com>
Co-authored-by: JackieTien97 <ja...@gmail.com>
---
.../iotdb/commons/partition/DataPartitionInfo.java | 67 +++++++++++
.../commons/partition/DataPartitionQueryParam.java | 30 +++--
.../iotdb/commons/partition/DataRegionId.java | 22 +++-
.../commons/partition/DataRegionReplicaSet.java | 43 ++++---
.../iotdb/commons/partition/DeviceGroupId.java | 33 +++---
.../iotdb/commons/partition/PartitionInfo.java | 31 ++---
.../commons/partition/SchemaPartitionInfo.java | 19 ++-
.../iotdb/commons/partition/SchemaRegionId.java | 14 ++-
.../commons/partition/SchemaRegionReplicaSet.java | 29 +++--
.../iotdb/commons/partition/TimePartitionId.java | 17 ++-
.../iotdb/db/mpp/common/FragmentInstanceId.java | 4 +
.../iotdb/db/mpp/common/MPPQueryContext.java | 16 ++-
.../{FragmentId.java => PlanFragmentId.java} | 16 ++-
.../SessionInfo.java} | 11 +-
.../apache/iotdb/db/mpp/execution/Coordinator.java | 24 +++-
.../ExecutionResult.java} | 15 ++-
.../iotdb/db/mpp/execution/FragmentInfo.java | 6 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 41 ++++---
.../mpp/execution/scheduler/ClusterScheduler.java | 4 +-
.../db/mpp/execution/scheduler/IScheduler.java | 4 +-
.../execution/scheduler/StandaloneScheduler.java | 4 +-
.../apache/iotdb/db/mpp/sql/analyze/Analysis.java | 63 ++++++----
.../{StatementAnalyzer.java => Analyzer.java} | 47 +++++++-
.../ClusterPartitionFetcher.java} | 29 ++---
.../ClusterSchemaFetcher.java} | 19 +--
.../IPartitionFetcher.java} | 39 ++-----
.../ISchemaFetcher.java} | 13 ++-
.../analyze/QueryType.java} | 7 +-
.../sql/analyze/StandalonePartitionFetcher.java | 66 +++++++++++
.../StandaloneSchemaFetcher.java} | 25 ++--
.../db/mpp/sql/planner/DistributionPlanner.java | 99 +++++++++++++---
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 19 ++-
.../mpp/sql/planner/plan/DistributedQueryPlan.java | 34 +++++-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 65 +++++++++++
...QueryPlan.java => IFragmentParallelPlaner.java} | 25 ++--
.../db/mpp/sql/planner/plan/PlanFragment.java | 61 +++++++++-
.../plan/SimpleFragmentParallelPlanner.java | 124 ++++++++++++++++++++
.../iotdb/db/mpp/sql/planner/plan/SubPlan.java | 68 +++++++++++
.../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 26 +++++
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 4 +
.../planner/plan/node/process/ExchangeNode.java | 85 +++++++++++---
.../planner/plan/node/sink/FragmentSinkNode.java | 61 +++++++++-
.../planner/plan/node/source/CsvSourceNode.java | 11 ++
.../plan/node/source/SeriesAggregateScanNode.java | 16 ++-
.../planner/plan/node/source/SeriesScanNode.java | 30 ++---
.../sql/planner/plan/node/source/SourceNode.java | 5 +
.../sql/planner/plan/node/write/InsertNode.java | 16 +--
.../planner/plan/node/write/InsertTabletNode.java | 21 +++-
.../statement/crud/InsertBaseStatement.java} | 37 +++---
.../crud/InsertTabletStatement.java} | 13 ++-
.../iotdb/db/mpp/sql/tree/StatementVisitor.java | 5 +
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 1 +
.../iotdb/db/query/control/SessionManager.java | 8 ++
.../iotdb/db/service/basic/ServiceProvider.java | 6 +-
.../db/service/thrift/impl/TSServiceImpl.java | 56 ++++++++-
...tatementAnalyzerTest.java => AnalyzerTest.java} | 4 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 128 +++++++++++++++------
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 4 +-
58 files changed, 1412 insertions(+), 378 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
new file mode 100644
index 0000000..9643471
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.partition;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DataPartitionInfo {
+
+ // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionPlaceInfo>>>>
+ private Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+ dataPartitionMap;
+
+ public Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+ getDataPartitionMap() {
+ return dataPartitionMap;
+ }
+
+ public void setDataPartitionMap(
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+ dataPartitionMap) {
+ this.dataPartitionMap = dataPartitionMap;
+ }
+
+ public List<DataRegionReplicaSet> getDataRegionReplicaSet(
+ String deviceName, List<TimePartitionId> timePartitionIdList) {
+ String storageGroup = getStorageGroupByDevice(deviceName);
+ DeviceGroupId deviceGroupId = calculateDeviceGroupId(deviceName);
+ // TODO: (xingtanzjr) the timePartitionIdList is ignored
+ return dataPartitionMap.get(storageGroup).get(deviceGroupId).values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ private DeviceGroupId calculateDeviceGroupId(String deviceName) {
+ // TODO: (xingtanzjr) implement the real algorithm for calculation of DeviceGroupId
+ return new DeviceGroupId(deviceName.length());
+ }
+
+ private String getStorageGroupByDevice(String deviceName) {
+ for (String storageGroup : dataPartitionMap.keySet()) {
+ if (deviceName.startsWith(storageGroup)) {
+ return storageGroup;
+ }
+ }
+ // TODO: (xingtanzjr) how to handle this exception in IoTDB
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 8cf0341..60cdf7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -16,18 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
-
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+package org.apache.iotdb.commons.partition;
import java.util.List;
-public class DistributedQueryPlan {
- private MPPQueryContext context;
- private PlanNode rootNode;
- private PlanFragment rootFragment;
+public class DataPartitionQueryParam {
+
+ private String deviceId;
+ private List<TimePartitionId> timePartitionIdList;
+
+ public String getDeviceId() {
+ return deviceId;
+ }
+
+ public void setDeviceId(String deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ public List<TimePartitionId> getTimePartitionIdList() {
+ return timePartitionIdList;
+ }
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ public void setTimePartitionIdList(List<TimePartitionId> timePartitionIdList) {
+ this.timePartitionIdList = timePartitionIdList;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
index 38a56f3..dffdd90 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
@@ -16,12 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
-public class PlanFragmentId {
- private String id;
+public class DataRegionId {
+ private int dataRegionId;
- public PlanFragmentId(String id) {
- this.id = id;
+ public DataRegionId(int dataRegionId) {
+ this.dataRegionId = dataRegionId;
+ }
+
+ public int getDataRegionId() {
+ return dataRegionId;
+ }
+
+ public void setDataRegionId(int dataRegionId) {
+ this.dataRegionId = dataRegionId;
+ }
+
+ public String toString() {
+ return String.format("DataRegion-%d", dataRegionId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
index 6151916..6ab4fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
@@ -16,41 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import java.util.List;
-public class FragmentSinkNode extends SinkNode {
- public FragmentSinkNode(PlanNodeId id) {
- super(id);
- }
+public class DataRegionReplicaSet {
+ private DataRegionId Id;
+ private List<EndPoint> endPointList;
- @Override
- public List<PlanNode> getChildren() {
- return null;
+ public DataRegionReplicaSet(DataRegionId Id, List<EndPoint> endPointList) {
+ this.Id = Id;
+ this.endPointList = endPointList;
}
- @Override
- public PlanNode clone() {
- return null;
+ public List<EndPoint> getEndPointList() {
+ return endPointList;
}
- @Override
- public PlanNode cloneWithChildren(List<PlanNode> children) {
- return null;
+ public void setEndPointList(List<EndPoint> endPointList) {
+ this.endPointList = endPointList;
}
- @Override
- public List<String> getOutputColumnNames() {
- return null;
+ public DataRegionId getId() {
+ return Id;
}
- @Override
- public void send() {}
+ public void setId(DataRegionId id) {
+ this.Id = id;
+ }
- @Override
- public void close() throws Exception {}
+ public String toString() {
+ return String.format("%s:%s", Id, endPointList);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
index 1a597b2..0a7123c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
@@ -16,24 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+package org.apache.iotdb.commons.partition;
-public class PlanNodeUtil {
- public static void printPlanNode(PlanNode root) {
- printPlanNodeWithLevel(root, 0);
+public class DeviceGroupId {
+ private int deviceGroupId;
+
+ public DeviceGroupId(int deviceGroupId) {
+ this.deviceGroupId = deviceGroupId;
+ }
+
+ public int getDeviceGroupId() {
+ return deviceGroupId;
+ }
+
+ public void setDeviceGroupId(int deviceGroupId) {
+ this.deviceGroupId = deviceGroupId;
}
- private static void printPlanNodeWithLevel(PlanNode root, int level) {
- printTab(level);
- System.out.println(root.toString());
- for (PlanNode child : root.getChildren()) {
- printPlanNodeWithLevel(child, level + 1);
- }
+ public int hashCode() {
+ return new Integer(deviceGroupId).hashCode();
}
- private static void printTab(int count) {
- for (int i = 0; i < count; i++) {
- System.out.print("\t");
- }
+ public boolean equals(Object obj) {
+ return obj instanceof DeviceGroupId
+ && this.deviceGroupId == ((DeviceGroupId) obj).deviceGroupId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
index 16145fd..90a22a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
@@ -16,25 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.scheduler;
+package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.db.mpp.common.FragmentId;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+public class PartitionInfo {
-import io.airlift.units.Duration;
+ private DataPartitionInfo dataPartitionInfo;
+ private SchemaPartitionInfo schemaPartitionInfo;
-public interface IScheduler {
+ public DataPartitionInfo getDataPartitionInfo() {
+ return dataPartitionInfo;
+ }
- void start();
+ public void setDataPartitionInfo(DataPartitionInfo dataPartitionInfo) {
+ this.dataPartitionInfo = dataPartitionInfo;
+ }
- void abort();
+ public SchemaPartitionInfo getSchemaPartitionInfo() {
+ return schemaPartitionInfo;
+ }
- Duration getTotalCpuTime();
-
- FragmentInfo getFragmentInfo();
-
- void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
-
- void cancelFragment(FragmentId fragmentId);
+ public void setSchemaPartitionInfo(SchemaPartitionInfo schemaPartitionInfo) {
+ this.schemaPartitionInfo = schemaPartitionInfo;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
index 38a56f3..119115c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
@@ -16,12 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
-public class PlanFragmentId {
- private String id;
+import java.util.Map;
- public PlanFragmentId(String id) {
- this.id = id;
+public class SchemaPartitionInfo {
+
+ // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
+ private Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>> schemaPartitionInfo;
+
+ public Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>> getSchemaPartitionInfo() {
+ return schemaPartitionInfo;
+ }
+
+ public void setSchemaPartitionInfo(
+ Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>> schemaPartitionInfo) {
+ this.schemaPartitionInfo = schemaPartitionInfo;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionId.java
similarity index 75%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionId.java
index 38a56f3..6a0363c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionId.java
@@ -16,12 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
-public class PlanFragmentId {
- private String id;
+public class SchemaRegionId {
+ private int schemaRegionId;
- public PlanFragmentId(String id) {
- this.id = id;
+ public int getSchemaRegionId() {
+ return schemaRegionId;
+ }
+
+ public void setSchemaRegionId(int schemaRegionId) {
+ this.schemaRegionId = schemaRegionId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
index 8cf0341..7f6e863 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
@@ -16,18 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import java.util.List;
-public class DistributedQueryPlan {
- private MPPQueryContext context;
- private PlanNode rootNode;
- private PlanFragment rootFragment;
+public class SchemaRegionReplicaSet {
+ private SchemaRegionId schemaRegionId;
+ private List<EndPoint> endPointList;
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ public SchemaRegionId getSchemaRegionId() {
+ return schemaRegionId;
+ }
+
+ public void setSchemaRegionId(SchemaRegionId schemaRegionId) {
+ this.schemaRegionId = schemaRegionId;
+ }
+
+ public List<EndPoint> getEndPointList() {
+ return endPointList;
+ }
+
+ public void setEndPointList(List<EndPoint> endPointList) {
+ this.endPointList = endPointList;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegionTimeSlice.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionId.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegionTimeSlice.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionId.java
index 65dcd39..0acbb43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegionTimeSlice.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionId.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,9 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.commons.partition;
-// TODO: (xingtanzjr) This class should be substituted with the class defined in Consensus level
-public class DataRegionTimeSlice {
- long startTimestamp;
+public class TimePartitionId {
+ private long startTime;
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index aa85300..8e9de62 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -48,4 +48,8 @@ public class FragmentInstanceId {
public String getInstanceId() {
return instanceId;
}
+
+ public String toString() {
+ return fullId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 4a29263..2a348d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -23,7 +23,19 @@ package org.apache.iotdb.db.mpp.common;
* info and so on
*/
public class MPPQueryContext {
- private String statement;
+ private String sql;
private QueryId queryId;
- private QuerySession session;
+ private SessionInfo session;
+
+ public MPPQueryContext() {}
+
+ public MPPQueryContext(String sql, QueryId queryId, SessionInfo session) {
+ this.sql = sql;
+ this.queryId = queryId;
+ this.session = session;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index ef0df6b..410f271 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -23,26 +23,26 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-public class FragmentId {
+public class PlanFragmentId {
private final QueryId queryId;
private final int id;
- public static FragmentId valueOf(String stageId) {
+ public static PlanFragmentId valueOf(String stageId) {
List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
return valueOf(ids);
}
- public static FragmentId valueOf(List<String> ids) {
+ public static PlanFragmentId valueOf(List<String> ids) {
checkArgument(ids.size() == 2, "Expected two ids but got: %s", ids);
- return new FragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
+ return new PlanFragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
}
- public FragmentId(String queryId, int id) {
+ public PlanFragmentId(String queryId, int id) {
this(new QueryId(queryId), id);
}
- public FragmentId(QueryId queryId, int id) {
+ public PlanFragmentId(QueryId queryId, int id) {
this.queryId = requireNonNull(queryId, "queryId is null");
this.id = id;
}
@@ -54,4 +54,8 @@ public class FragmentId {
public int getId() {
return id;
}
+
+ public String toString() {
+ return String.format("%s.%d", queryId, id);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
index 38a56f3..ad64267 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.common;
-public class PlanFragmentId {
- private String id;
+import java.time.ZoneId;
- public PlanFragmentId(String id) {
- this.id = id;
- }
+public class SessionInfo {
+ private String userName;
+ private ZoneId zoneId;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index f6c85e8..65d92b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -18,7 +18,11 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,14 +35,30 @@ public class Coordinator {
private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
- private QueryExecution createQueryExecution() {
- return null;
+ public static Coordinator getInstance() {
+ return new Coordinator();
+ }
+
+ private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+ return new QueryExecution(statement, queryContext);
}
private QueryExecution getQueryExecutionById() {
return null;
}
+ public ExecutionResult execute(
+ Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
+
+ QueryExecution execution =
+ createQueryExecution(statement, new MPPQueryContext(sql, queryId, session));
+ queryExecutionMap.put(queryId, execution);
+
+ execution.start();
+
+ return execution.getResult();
+ }
+
// private TQueryResponse executeQuery(TQueryRequest request) {
//
// }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
index 38a56f3..4dae202 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
@@ -16,12 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.execution;
-public class PlanFragmentId {
- private String id;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
- public PlanFragmentId(String id) {
- this.id = id;
+public class ExecutionResult {
+ public QueryId queryId;
+ public TSStatus status;
+
+ public ExecutionResult(QueryId queryId, TSStatus status) {
+ this.queryId = queryId;
+ this.status = status;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
index 7bd9300..9e3412e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
@@ -18,21 +18,21 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import java.util.List;
public class FragmentInfo {
- private final FragmentId stageId;
+ private final PlanFragmentId stageId;
private final FragmentState state;
private final PlanFragment plan;
private final List<FragmentInfo> childrenFragments;
public FragmentInfo(
- FragmentId stageId,
+ PlanFragmentId stageId,
FragmentState state,
PlanFragment plan,
List<FragmentInfo> childrenFragments) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 7e14bf3..0b15ef1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -22,14 +22,19 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.*;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.rpc.TSStatusCode;
import java.nio.ByteBuffer;
import java.util.List;
+import static org.apache.iotdb.rpc.RpcUtils.getStatus;
+
/**
* QueryExecution stores all the status of a query which is being prepared or running inside the MPP
* frame. It takes three main responsibilities: 1. Prepare a query. Transform a query from statement
@@ -43,50 +48,45 @@ public class QueryExecution {
private List<PlanOptimizer> planOptimizers;
- private Analysis analysis;
+ private final Analysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
- private List<PlanFragment> fragments;
private List<FragmentInstance> fragmentInstances;
- public QueryExecution(MPPQueryContext context) {
+ public QueryExecution(Statement statement, MPPQueryContext context) {
this.context = context;
+ this.analysis = analyze(statement, context);
}
- public void plan() {
- analyze();
+ public void start() {
doLogicalPlan();
doDistributedPlan();
- planFragmentInstances();
- }
-
- public void schedule() {
- this.scheduler = new ClusterScheduler(this.stateMachine, this.fragmentInstances);
- this.scheduler.start();
+ schedule();
}
// Analyze the statement in QueryContext. Generate the analysis this query need
- public void analyze() {
+ private static Analysis analyze(Statement statement, MPPQueryContext context) {
// initialize the variable `analysis`
+ return new Analyzer(context).analyze(statement);
+ }
+ private void schedule() {
+ this.scheduler = new ClusterScheduler(this.stateMachine, this.fragmentInstances);
+ this.scheduler.start();
}
// Use LogicalPlanner to do the logical query plan and logical optimization
- public void doLogicalPlan() {
+ private void doLogicalPlan() {
LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers);
this.logicalPlan = planner.plan(this.analysis);
}
// Generate the distributed plan and split it into fragments
- public void doDistributedPlan() {
+ private void doDistributedPlan() {
DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
this.distributedPlan = planner.planFragments();
}
- // Convert fragment to detailed instance
- // And for parallel-able fragment, clone it into several instances with different params.
- public void planFragmentInstances() {}
-
/**
* This method will be called by the request thread from client connection. This method will block
* until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
@@ -97,4 +97,9 @@ public class QueryExecution {
public ByteBuffer getBatchResult() {
return null;
}
+
+ public ExecutionResult getResult() {
+
+ return new ExecutionResult(context.getQueryId(), getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 4af8b31..0a18d07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.mpp.common.FragmentId;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -67,7 +67,7 @@ public class ClusterScheduler implements IScheduler {
public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
@Override
- public void cancelFragment(FragmentId fragmentId) {}
+ public void cancelFragment(PlanFragmentId planFragmentId) {}
// Send the instances to other nodes
private void sendFragmentInstances() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
index 16145fd..1ecd4e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.mpp.common.FragmentId;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
import io.airlift.units.Duration;
@@ -36,5 +36,5 @@ public interface IScheduler {
void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
- void cancelFragment(FragmentId fragmentId);
+ void cancelFragment(PlanFragmentId planFragmentId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 10de6b5..d876561 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.metadata.SchemaEngine;
-import org.apache.iotdb.db.mpp.common.FragmentId;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
import io.airlift.units.Duration;
@@ -52,5 +52,5 @@ public class StandaloneScheduler implements IScheduler {
public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
@Override
- public void cancelFragment(FragmentId fragmentId) {}
+ public void cancelFragment(PlanFragmentId planFragmentId) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index dda47dc..a5e1d17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -19,12 +19,13 @@
package org.apache.iotdb.db.mpp.sql.analyze;
-import org.apache.iotdb.db.metadata.SchemaRegion;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
-import org.apache.iotdb.db.mpp.common.DataRegionTimeSlice;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.*;
@@ -39,28 +40,18 @@ public class Analysis {
// Statement
private Statement statement;
- // DataPartitionInfo
- private Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo;
-
- // SchemaPartitionInfo
- private Map<String, List<SchemaRegion>> schemaPartitionInfo;
-
- public Set<DataRegion> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
- if (timefilter == null) {
- // TODO: (xingtanzjr) we need to have a method to get the deviceGroup by device
- String deviceGroup = seriesPath.getDevice();
- Set<DataRegion> result = new HashSet<>();
- this.dataPartitionInfo.get(deviceGroup).values().forEach(result::addAll);
- return result;
- } else {
- // TODO: (xingtanzjr) complete this branch
- return null;
- }
- }
+ // indicate whether this statement is write or read
+ private QueryType queryType;
- public void setDataPartitionInfo(
- Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo) {
- this.dataPartitionInfo = dataPartitionInfo;
+ private DataPartitionInfo dataPartitionInfo;
+
+ private SchemaPartitionInfo schemaPartitionInfo;
+
+ private Map<String, MeasurementSchema> schemaMap;
+
+ public List<DataRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
+ // TODO: (xingtanzjr) implement the calculation of timePartitionIdList
+ return dataPartitionInfo.getDataRegionReplicaSet(seriesPath.getDevice(), null);
}
public Statement getStatement() {
@@ -70,4 +61,28 @@ public class Analysis {
public void setStatement(Statement statement) {
this.statement = statement;
}
+
+ public DataPartitionInfo getDataPartitionInfo() {
+ return dataPartitionInfo;
+ }
+
+ public void setDataPartitionInfo(DataPartitionInfo dataPartitionInfo) {
+ this.dataPartitionInfo = dataPartitionInfo;
+ }
+
+ public SchemaPartitionInfo getSchemaPartitionInfo() {
+ return schemaPartitionInfo;
+ }
+
+ public void setSchemaPartitionInfo(SchemaPartitionInfo schemaPartitionInfo) {
+ this.schemaPartitionInfo = schemaPartitionInfo;
+ }
+
+ public Map<String, MeasurementSchema> getSchemaMap() {
+ return schemaMap;
+ }
+
+ public void setSchemaMap(Map<String, MeasurementSchema> schemaMap) {
+ this.schemaMap = schemaMap;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzer.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 6bd7113..c1f1f28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -31,18 +33,26 @@ import org.apache.iotdb.db.mpp.sql.rewriter.RemoveNotOptimizer;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.tree.StatementVisitor;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.Arrays;
+import java.util.Map;
/** Analyze the statement and generate Analysis. */
-public class StatementAnalyzer {
+public class Analyzer {
- private final Analysis analysis;
private final MPPQueryContext context;
- public StatementAnalyzer(Analysis analysis, MPPQueryContext context) {
- this.analysis = analysis;
+ // TODO need to use factory to decide standalone or cluster
+ private final IPartitionFetcher partitionFetcher = StandalonePartitionFetcher.getInstance();
+ // TODO need to use factory to decide standalone or cluster
+ private final ISchemaFetcher schemaFetcher = StandaloneSchemaFetcher.getInstance();
+
+ public Analyzer(MPPQueryContext context) {
this.context = context;
}
@@ -55,12 +65,14 @@ public class StatementAnalyzer {
@Override
public Analysis visitStatement(Statement statement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
analysis.setStatement(statement);
return analysis;
}
@Override
public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
+ Analysis analysis = new Analysis();
try {
// check for semantic errors
queryStatement.selfCheck();
@@ -90,6 +102,7 @@ public class StatementAnalyzer {
@Override
public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
// TODO: do analyze for insert statement
+ Analysis analysis = new Analysis();
analysis.setStatement(insertStatement);
return analysis;
}
@@ -109,8 +122,34 @@ public class StatementAnalyzer {
}
}
}
+ Analysis analysis = new Analysis();
analysis.setStatement(createTimeSeriesStatement);
return analysis;
}
+
+ @Override
+ public Analysis visitInsertTablet(
+ InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+ // TODO(INSERT) device + time range -> PartitionInfo
+ DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+ dataPartitionQueryParam.setDeviceId(insertTabletStatement.getDevicePath().getFullPath());
+ // TODO(INSERT) calculate the time partition id list
+ // dataPartitionQueryParam.setTimePartitionIdList();
+ PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfo(dataPartitionQueryParam);
+
+ // TODO(INSERT) get each time series schema according to SchemaPartitionInfo in PartitionInfo
+ Map<String, MeasurementSchema> schemaMap =
+ schemaFetcher.fetchSchema(
+ insertTabletStatement.getDevicePath(),
+ Arrays.asList(insertTabletStatement.getMeasurements()));
+
+ Analysis analysis = new Analysis();
+ analysis.setSchemaMap(schemaMap);
+ // TODO(INSERT) do type check here
+ analysis.setStatement(insertTabletStatement);
+ analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
+ analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
+ return analysis;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index 6151916..cdecf45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -16,41 +16,44 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.analyze;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
import java.util.List;
-public class FragmentSinkNode extends SinkNode {
- public FragmentSinkNode(PlanNodeId id) {
- super(id);
- }
+public class ClusterPartitionFetcher implements IPartitionFetcher {
@Override
- public List<PlanNode> getChildren() {
+ public DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter) {
return null;
}
@Override
- public PlanNode clone() {
+ public DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList) {
return null;
}
@Override
- public PlanNode cloneWithChildren(List<PlanNode> children) {
+ public SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId) {
return null;
}
@Override
- public List<String> getOutputColumnNames() {
+ public SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId) {
return null;
}
@Override
- public void send() {}
+ public PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter) {
+ return null;
+ }
@Override
- public void close() throws Exception {}
+ public PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 8cf0341..c7092d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -16,18 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.analyze;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
+import java.util.Map;
-public class DistributedQueryPlan {
- private MPPQueryContext context;
- private PlanNode rootNode;
- private PlanFragment rootFragment;
+public class ClusterSchemaFetcher implements ISchemaFetcher {
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ @Override
+ public Map<String, MeasurementSchema> fetchSchema(
+ PartialPath deviceId, List<String> measurementIdList) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
index 6151916..d29c601 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
@@ -16,41 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.analyze;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
import java.util.List;
-public class FragmentSinkNode extends SinkNode {
- public FragmentSinkNode(PlanNodeId id) {
- super(id);
- }
+public interface IPartitionFetcher {
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
+ DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter);
- @Override
- public PlanNode clone() {
- return null;
- }
+ DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList);
- @Override
- public PlanNode cloneWithChildren(List<PlanNode> children) {
- return null;
- }
+ SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId);
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
+ SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId);
- @Override
- public void send() {}
+ PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter);
- @Override
- public void close() throws Exception {}
+ PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/metadata/IMetadataFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/metadata/IMetadataFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
index bf326c5..c2ac230 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/metadata/IMetadataFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
@@ -17,9 +17,18 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.metadata;
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
/**
* This interface is used to fetch the metadata information required in execution plan generating.
*/
-public interface IMetadataFetcher {}
+public interface ISchemaFetcher {
+
+ Map<String, MeasurementSchema> fetchSchema(PartialPath deviceId, List<String> measurementIdList);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/QueryType.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/QueryType.java
index 3f1d165..8682628 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/QueryType.java
@@ -16,6 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.analyze;
-public class QuerySession {}
+public enum QueryType {
+ WRITE,
+ READ;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
new file mode 100644
index 0000000..ca274a5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
+
+import java.util.List;
+
+public class StandalonePartitionFetcher implements IPartitionFetcher {
+
+ private StandalonePartitionFetcher() {}
+
+ // TODO need to use safe singleton pattern
+ public static StandalonePartitionFetcher getInstance() {
+ return new StandalonePartitionFetcher();
+ }
+
+ @Override
+ public DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter) {
+ return null;
+ }
+
+ @Override
+ public DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+ return null;
+ }
+
+ @Override
+ public SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId) {
+ return null;
+ }
+
+ @Override
+ public SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId) {
+ return null;
+ }
+
+ @Override
+ public PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter) {
+ return null;
+ }
+
+ @Override
+ public PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index 8cf0341..21d3e39 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -16,18 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.analyze;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
+import java.util.Map;
-public class DistributedQueryPlan {
- private MPPQueryContext context;
- private PlanNode rootNode;
- private PlanFragment rootFragment;
+public class StandaloneSchemaFetcher implements ISchemaFetcher {
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+ private StandaloneSchemaFetcher() {}
+
+ public static StandaloneSchemaFetcher getInstance() {
+ return new StandaloneSchemaFetcher();
+ }
+
+ @Override
+ public Map<String, MeasurementSchema> fetchSchema(
+ PartialPath deviceId, List<String> measurementIdList) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 5a4123e..5b2a969 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -18,13 +18,14 @@
*/
package org.apache.iotdb.db.mpp.sql.planner;
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -37,6 +38,8 @@ public class DistributionPlanner {
private Analysis analysis;
private LogicalQueryPlan logicalPlan;
+ private int planFragmentIndex = 0;
+
public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
this.analysis = analysis;
this.logicalPlan = logicalPlan;
@@ -52,8 +55,29 @@ public class DistributionPlanner {
return adder.visit(root, new NodeGroupContext());
}
+ public SubPlan splitFragment(PlanNode root) {
+ FragmentBuilder fragmentBuilder = new FragmentBuilder();
+ return fragmentBuilder.splitToSubPlan(root);
+ }
+
public DistributedQueryPlan planFragments() {
- return null;
+ PlanNode rootAfterRewrite = rewriteSource();
+ PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+ SubPlan subPlan = splitFragment(rootWithExchange);
+ List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
+ return new DistributedQueryPlan(
+ logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
+ }
+
+ // Convert fragment to detailed instance
+ // And for parallel-able fragment, clone it into several instances with different params.
+ public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
+ IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+ return parallelPlaner.parallelPlan();
+ }
+
+ private PlanFragmentId getNextFragmentId() {
+ return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex++);
}
private class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -74,13 +98,13 @@ public class DistributionPlanner {
// If the child is SeriesScanNode, we need to check whether this node should be seperated
// into several splits.
SeriesScanNode handle = (SeriesScanNode) child;
- Set<DataRegion> dataDistribution =
+ List<DataRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(handle.getSeriesPath(), handle.getTimeFilter());
// If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
// SeriesScanNode.
- for (DataRegion dataRegion : dataDistribution) {
+ for (DataRegionReplicaSet dataRegion : dataDistribution) {
SeriesScanNode split = (SeriesScanNode) handle.clone();
- split.setDataRegion(dataRegion);
+ split.setDataRegionReplicaSet(dataRegion);
sources.add(split);
}
} else if (child instanceof SeriesAggregateScanNode) {
@@ -97,8 +121,8 @@ public class DistributionPlanner {
}
// Step 2: For the source nodes, group them by the DataRegion.
- Map<DataRegion, List<SeriesScanNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegion));
+ Map<DataRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
+ sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegionReplicaSet));
// Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
@@ -148,13 +172,15 @@ public class DistributionPlanner {
public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
- node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+ node.getId(),
+ new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
return node.clone();
}
public PlanNode visitSeriesAggregate(SeriesAggregateScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
- node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+ node.getId(),
+ new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
return node.clone();
}
@@ -167,7 +193,7 @@ public class DistributionPlanner {
visitedChildren.add(visit(child, context));
});
- DataRegion dataRegion = calculateDataRegionByChildren(visitedChildren, context);
+ DataRegionReplicaSet dataRegion = calculateDataRegionByChildren(visitedChildren, context);
NodeDistributionType distributionType =
nodeDistributionIsSame(visitedChildren, context)
? NodeDistributionType.SAME_WITH_ALL_CHILDREN
@@ -187,7 +213,7 @@ public class DistributionPlanner {
child -> {
if (!dataRegion.equals(context.getNodeDistribution(child.getId()).dataRegion)) {
ExchangeNode exchangeNode = new ExchangeNode(PlanNodeIdAllocator.generateId());
- exchangeNode.setSourceNode(child);
+ exchangeNode.setChild(child);
newNode.addChild(exchangeNode);
} else {
newNode.addChild(child);
@@ -196,12 +222,11 @@ public class DistributionPlanner {
return newNode;
}
- private DataRegion calculateDataRegionByChildren(
+ private DataRegionReplicaSet calculateDataRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
// We always make the dataRegion of TimeJoinNode to be the same as its first child.
// TODO: (xingtanzjr) We need to implement more suitable policies here
- DataRegion childDataRegion = context.getNodeDistribution(children.get(0).getId()).dataRegion;
- return new DataRegion(childDataRegion.getDataRegionId(), childDataRegion.getEndpoint());
+ return context.getNodeDistribution(children.get(0).getId()).dataRegion;
}
private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
@@ -246,11 +271,49 @@ public class DistributionPlanner {
private class NodeDistribution {
private NodeDistributionType type;
- private DataRegion dataRegion;
+ private DataRegionReplicaSet dataRegion;
- private NodeDistribution(NodeDistributionType type, DataRegion dataRegion) {
+ private NodeDistribution(NodeDistributionType type, DataRegionReplicaSet dataRegion) {
this.type = type;
this.dataRegion = dataRegion;
}
}
+
+ private class FragmentBuilder {
+ public SubPlan splitToSubPlan(PlanNode root) {
+ SubPlan rootSubPlan = createSubPlan(root);
+ splitToSubPlan(root, rootSubPlan);
+ return rootSubPlan;
+ }
+
+ private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+ if (root instanceof ExchangeNode) {
+ // We add a FragmentSinkNode for newly created PlanFragment
+ ExchangeNode exchangeNode = (ExchangeNode) root;
+ FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
+ sinkNode.setChild(exchangeNode.getChild());
+ sinkNode.setDownStreamNode(exchangeNode);
+ // Record the source node info in the ExchangeNode so that we can keep the connection of
+ // these nodes/fragments
+ exchangeNode.setRemoteSourceNode(sinkNode);
+ // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
+ exchangeNode.cleanChildren();
+
+ // Build the child SubPlan Tree
+ SubPlan childSubPlan = createSubPlan(sinkNode);
+ splitToSubPlan(sinkNode, childSubPlan);
+
+ subPlan.addChild(childSubPlan);
+ return;
+ }
+ for (PlanNode child : root.getChildren()) {
+ splitToSubPlan(child, subPlan);
+ }
+ }
+
+ private SubPlan createSubPlan(PlanNode root) {
+ PlanFragment fragment = new PlanFragment(getNextFragmentId(), root);
+ return new SubPlan(fragment);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index a997705..dcacb70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.sql.tree.StatementVisitor;
@@ -43,7 +45,7 @@ public class LogicalPlanner {
}
public LogicalQueryPlan plan(Analysis analysis) {
- PlanNode rootNode = new LogicalPlanVisitor().process(analysis.getStatement());
+ PlanNode rootNode = new LogicalPlanVisitor(analysis).process(analysis.getStatement());
// optimize the query logical plan
if (analysis.getStatement() instanceof QueryStatement) {
@@ -62,6 +64,12 @@ public class LogicalPlanner {
private static final class LogicalPlanVisitor
extends StatementVisitor<PlanNode, MPPQueryContext> {
+ private final Analysis analysis;
+
+ public LogicalPlanVisitor(Analysis analysis) {
+ this.analysis = analysis;
+ }
+
@Override
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
// TODO: Generate logical planNode tree for query statement
@@ -82,5 +90,14 @@ public class LogicalPlanner {
createTimeSeriesStatement.getAttributes(),
createTimeSeriesStatement.getAlias());
}
+
+ @Override
+ public PlanNode visitInsertTablet(
+ InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+ // TODO(INSERT) change the InsertTabletStatement to InsertTabletNode
+ InsertTabletNode node = new InsertTabletNode(PlanNodeIdAllocator.generateId());
+
+ return node;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 8cf0341..2df5323 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -19,15 +19,39 @@
package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import java.util.List;
public class DistributedQueryPlan {
private MPPQueryContext context;
- private PlanNode rootNode;
- private PlanFragment rootFragment;
-
- // TODO: consider whether this field is necessary when do the implementation
+ private SubPlan rootSubPlan;
private List<PlanFragment> fragments;
+ private List<FragmentInstance> instances;
+
+ public DistributedQueryPlan(
+ MPPQueryContext context,
+ SubPlan rootSubPlan,
+ List<PlanFragment> fragments,
+ List<FragmentInstance> instances) {
+ this.context = context;
+ this.rootSubPlan = rootSubPlan;
+ this.fragments = fragments;
+ this.instances = instances;
+ }
+
+ public List<PlanFragment> getFragments() {
+ return fragments;
+ }
+
+ public SubPlan getRootSubPlan() {
+ return rootSubPlan;
+ }
+
+ public MPPQueryContext getContext() {
+ return context;
+ }
+
+ public List<FragmentInstance> getInstances() {
+ return instances;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 7674f4f..5749c36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,14 +18,79 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
public class FragmentInstance {
private FragmentInstanceId id;
// The reference of PlanFragment which this instance is generated from
private PlanFragment fragment;
+ // The DataRegion where the FragmentInstance should run
+ private DataRegionReplicaSet dataRegion;
+ private EndPoint hostEndpoint;
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
+
+ public FragmentInstance(PlanFragment fragment, int index) {
+ this.fragment = fragment;
+ this.id = generateId(fragment.getId(), index);
+ }
+
+ public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
+ return new FragmentInstanceId(String.format("%s.%d", id, index));
+ }
+
+ public DataRegionReplicaSet getDataRegionId() {
+ return dataRegion;
+ }
+
+ public void setDataRegionId(DataRegionReplicaSet dataRegion) {
+ this.dataRegion = dataRegion;
+ }
+
+ public EndPoint getHostEndpoint() {
+ return hostEndpoint;
+ }
+
+ public void setHostEndpoint(EndPoint hostEndpoint) {
+ this.hostEndpoint = hostEndpoint;
+ }
+
+ public PlanFragment getFragment() {
+ return fragment;
+ }
+
+ public FragmentInstanceId getId() {
+ return id;
+ }
+
+ public String getDownstreamInfo() {
+ PlanNode root = getFragment().getRoot();
+ if (root instanceof FragmentSinkNode) {
+ FragmentSinkNode sink = (FragmentSinkNode) root;
+ return String.format(
+ "(%s, %s, %s)",
+ sink.getDownStreamEndpoint(), sink.getDownStreamInstanceId(), sink.getDownStreamNode());
+ }
+ return "<No downstream>";
+ }
+
+ public String toString() {
+ StringBuilder ret = new StringBuilder();
+ ret.append(
+ String.format(
+ "FragmentInstance-%s:[Host: %s/%s]\n",
+ getId(), getHostEndpoint().getIp(), getDataRegionId().getId()));
+ ret.append("---- Plan Node Tree ----\n");
+ ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
+ ret.append("\n");
+ return ret.toString();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
index 8cf0341..36be61f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,18 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
import java.util.List;
-public class DistributedQueryPlan {
- private MPPQueryContext context;
- private PlanNode rootNode;
- private PlanFragment rootFragment;
-
- // TODO: consider whether this field is necessary when do the implementation
- private List<PlanFragment> fragments;
+/**
+ * The interface is used to transform one PlanFragment into one or more FragmentInstances which
+ * could run in parallel
+ */
+public interface IFragmentParallelPlaner {
+ /**
+ * The relation between each PlanFragment is necessary because sometimes we need to change the
+ * source/sink for each FragmentInstance according to its upstream/downstream
+ *
+ * @return All the FragmentInstances which can run in parallel
+ */
+ List<FragmentInstance> parallelPlan();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 0aa3ac5..173a8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -18,11 +18,70 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
-// TODO: consider whether it is necessary to make PlanFragment as a TreeNode
/** PlanFragment contains a sub-query of distributed query. */
public class PlanFragment {
private PlanFragmentId id;
private PlanNode root;
+
+ public PlanFragment(PlanFragmentId id, PlanNode root) {
+ this.id = id;
+ this.root = root;
+ }
+
+ public PlanFragmentId getId() {
+ return id;
+ }
+
+ public PlanNode getRoot() {
+ return root;
+ }
+
+ public String toString() {
+ return String.format("PlanFragment-%s", getId());
+ }
+
+ // Every Fragment should only run in DataRegion.
+ // But it can select any one of the Endpoint of the target DataRegion
+ // In current version, one PlanFragment should contain at least one SourceNode,
+ // and the DataRegions of all SourceNodes should be same in one PlanFragment.
+ // So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion.
+ public DataRegionReplicaSet getTargetDataRegion() {
+ return getNodeDataRegion(root);
+ }
+
+ private DataRegionReplicaSet getNodeDataRegion(PlanNode root) {
+ if (root instanceof SourceNode) {
+ return ((SourceNode) root).getDataRegionReplicaSet();
+ }
+ for (PlanNode child : root.getChildren()) {
+ DataRegionReplicaSet result = getNodeDataRegion(child);
+ if (result != null) {
+ return result;
+ }
+ }
+ return null;
+ }
+
+ public PlanNode getPlanNodeById(PlanNodeId nodeId) {
+ return getPlanNodeById(root, nodeId);
+ }
+
+ private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
+ if (root.getId().equals(nodeId)) {
+ return root;
+ }
+ for (PlanNode child : root.getChildren()) {
+ PlanNode node = getPlanNodeById(child, nodeId);
+ if (node != null) {
+ return node;
+ }
+ }
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
new file mode 100644
index 0000000..3603310
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple implementation of IFragmentParallelPlaner. This planner will transform one PlanFragment
+ * into only one FragmentInstance.
+ */
+public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
+
+ private SubPlan subPlan;
+
+ // Record all the FragmentInstances belonged to same PlanFragment
+ Map<PlanFragmentId, FragmentInstance> instanceMap;
+ // Record which PlanFragment the PlanNode belongs
+ Map<PlanNodeId, PlanFragmentId> planNodeMap;
+ List<FragmentInstance> fragmentInstanceList;
+
+ public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+ this.subPlan = subPlan;
+ this.instanceMap = new HashMap<>();
+ this.planNodeMap = new HashMap<>();
+ this.fragmentInstanceList = new ArrayList<>();
+ }
+
+ @Override
+ public List<FragmentInstance> parallelPlan() {
+ prepare();
+ calculateNodeTopologyBetweenInstance();
+ return fragmentInstanceList;
+ }
+
+ private void prepare() {
+ List<PlanFragment> fragments = subPlan.getPlanFragmentList();
+ for (PlanFragment fragment : fragments) {
+ recordPlanNodeRelation(fragment.getRoot(), fragment.getId());
+ produceFragmentInstance(fragment);
+ }
+ }
+
+ private void produceFragmentInstance(PlanFragment fragment) {
+ // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
+ // one by one
+ int instanceIdx = 0;
+ PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+ FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+
+ // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
+ // of them.
+ DataRegionReplicaSet dataRegion = fragment.getTargetDataRegion();
+
+ // Set DataRegion and target host for the instance
+ // We need to store all the replica host in case of the scenario that the instance need to be
+ // redirected
+ // to another host when scheduling
+ fragmentInstance.setDataRegionId(dataRegion);
+
+ // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
+ // instance
+ fragmentInstance.setHostEndpoint(dataRegion.getEndPointList().get(0));
+ instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
+ fragmentInstanceList.add(fragmentInstance);
+ }
+
+ private void calculateNodeTopologyBetweenInstance() {
+ for (FragmentInstance instance : fragmentInstanceList) {
+ PlanNode rootNode = instance.getFragment().getRoot();
+ if (rootNode instanceof FragmentSinkNode) {
+ // Set target Endpoint for FragmentSinkNode
+ FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
+ PlanNodeId downStreamNodeId = sinkNode.getDownStreamNode().getId();
+ FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
+ sinkNode.setDownStream(
+ downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
+
+ // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
+ PlanNode downStreamExchangeNode =
+ downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+ ((ExchangeNode) downStreamExchangeNode)
+ .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getId());
+ }
+ }
+ }
+
+ private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
+ return instanceMap.get(planNodeMap.get(exchangeNodeId));
+ }
+
+ private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
+ planNodeMap.put(root.getId(), planFragmentId);
+ for (PlanNode child : root.getChildren()) {
+ recordPlanNodeRelation(child, planFragmentId);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
new file mode 100644
index 0000000..1ce2f1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SubPlan {
+ private PlanFragment planFragment;
+ private List<SubPlan> children;
+
+ public SubPlan(PlanFragment planFragment) {
+ this.planFragment = planFragment;
+ this.children = new ArrayList<>();
+ }
+
+ public void setChildren(List<SubPlan> children) {
+ this.children = children;
+ }
+
+ public void addChild(SubPlan subPlan) {
+ this.children.add(subPlan);
+ }
+
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append(
+ String.format(
+ "SubPlan-%s. RootNodeId: %s\n", planFragment.getId(), planFragment.getRoot().getId()));
+ children.forEach(result::append);
+ return result.toString();
+ }
+
+ public PlanFragment getPlanFragment() {
+ return this.planFragment;
+ }
+
+ public List<SubPlan> getChildren() {
+ return this.children;
+ }
+
+ public List<PlanFragment> getPlanFragmentList() {
+ List<PlanFragment> result = new ArrayList<>();
+ result.add(this.planFragment);
+ this.children.forEach(
+ child -> {
+ result.add(child.getPlanFragment());
+ });
+ return result;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index 1a597b2..f05a1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class PlanNodeUtil {
public static void printPlanNode(PlanNode root) {
printPlanNodeWithLevel(root, 0);
@@ -36,4 +39,27 @@ public class PlanNodeUtil {
System.out.print("\t");
}
}
+
+ public static String nodeToString(PlanNode root) {
+ StringBuilder result = new StringBuilder();
+ nodeToString(root, 0, result);
+ return result.toString();
+ }
+
+ private static void nodeToString(PlanNode root, int level, StringBuilder result) {
+ for (int i = 0; i < level; i++) {
+ result.append("\t");
+ }
+ result.append(root.toString());
+ result.append(System.lineSeparator());
+ for (PlanNode child : root.getChildren()) {
+ nodeToString(child, level + 1, result);
+ }
+ }
+
+ public static PlanNode deepCopy(PlanNode root) {
+ List<PlanNode> children =
+ root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+ return root.cloneWithChildren(children);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 0be251d..b0ff572 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -73,4 +73,8 @@ public abstract class PlanVisitor<R, C> {
public R visitTimeJoin(TimeJoinNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitExchange(ExchangeNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 4a36a75..7eab8c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -19,17 +19,26 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import com.google.common.collect.ImmutableList;
import java.util.List;
public class ExchangeNode extends PlanNode {
- private PlanNode sourceNode;
- private FragmentId sourceFragmentId;
+ private PlanNode child;
+ private FragmentSinkNode remoteSourceNode;
+
+ // In current version, one ExchangeNode will only have one source.
+ // And the fragment which the sourceNode belongs to will only have one instance.
+ // Thus, by nodeId and endpoint, the ExchangeNode can know where its source from.
+ private EndPoint upstreamEndpoint;
+ private FragmentInstanceId upstreamInstanceId;
+ private PlanNodeId upstreamPlanNodeId;
public ExchangeNode(PlanNodeId id) {
super(id);
@@ -37,43 +46,85 @@ public class ExchangeNode extends PlanNode {
@Override
public List<PlanNode> getChildren() {
- return ImmutableList.of(sourceNode);
+ if (this.child == null) {
+ return ImmutableList.of();
+ }
+ return ImmutableList.of(child);
}
@Override
public PlanNode clone() {
- return new ExchangeNode(getId());
+ ExchangeNode node = new ExchangeNode(getId());
+ if (remoteSourceNode != null) {
+ node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone());
+ }
+ return node;
}
@Override
public PlanNode cloneWithChildren(List<PlanNode> children) {
- ExchangeNode node = new ExchangeNode(getId());
- node.setSourceNode(children.get(0));
+ ExchangeNode node = (ExchangeNode) clone();
+ if (children != null && children.size() > 0) {
+ node.setChild(children.get(0));
+ }
return node;
}
+ public void setUpstream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+ this.upstreamEndpoint = endPoint;
+ this.upstreamInstanceId = instanceId;
+ this.upstreamPlanNodeId = nodeId;
+ }
+
@Override
public List<String> getOutputColumnNames() {
return null;
}
- public void setSourceFragmentId(FragmentId sourceFragmentId) {
- this.sourceFragmentId = sourceFragmentId;
+ public PlanNode getChild() {
+ return child;
}
- public FragmentId getSourceFragmentId() {
- return sourceFragmentId;
+ public void setChild(PlanNode child) {
+ this.child = child;
}
- public PlanNode getSourceNode() {
- return sourceNode;
+ public String toString() {
+ return String.format(
+ "ExchangeNode-%s: [SourceNodeId: %s, SourceAddress:%s]",
+ getId(), remoteSourceNode.getId(), getSourceAddress());
}
- public void setSourceNode(PlanNode sourceNode) {
- this.sourceNode = sourceNode;
+ public String getSourceAddress() {
+ if (getUpstreamEndpoint() == null) {
+ return "Not assigned";
+ }
+ return String.format(
+ "%s/%s/%s",
+ getUpstreamEndpoint().getIp(), getUpstreamInstanceId(), getUpstreamPlanNodeId());
}
- public String toString() {
- return String.format("ExchangeNode-%s", getId());
+ public FragmentSinkNode getRemoteSourceNode() {
+ return remoteSourceNode;
+ }
+
+ public void setRemoteSourceNode(FragmentSinkNode remoteSourceNode) {
+ this.remoteSourceNode = remoteSourceNode;
+ }
+
+ public void cleanChildren() {
+ this.child = null;
+ }
+
+ public EndPoint getUpstreamEndpoint() {
+ return upstreamEndpoint;
+ }
+
+ public FragmentInstanceId getUpstreamInstanceId() {
+ return upstreamInstanceId;
+ }
+
+ public PlanNodeId getUpstreamPlanNodeId() {
+ return upstreamPlanNodeId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 6151916..3144a4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,19 +18,31 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import com.google.common.collect.ImmutableList;
import java.util.List;
public class FragmentSinkNode extends SinkNode {
+ private PlanNode child;
+ private ExchangeNode downStreamNode;
+
+ private EndPoint downStreamEndpoint;
+ private FragmentInstanceId downStreamInstanceId;
+ private PlanNodeId downStreamPlanNodeId;
+
public FragmentSinkNode(PlanNodeId id) {
super(id);
}
@Override
public List<PlanNode> getChildren() {
- return null;
+ return ImmutableList.of(child);
}
@Override
@@ -53,4 +65,51 @@ public class FragmentSinkNode extends SinkNode {
@Override
public void close() throws Exception {}
+
+ public PlanNode getChild() {
+ return child;
+ }
+
+ public void setChild(PlanNode child) {
+ this.child = child;
+ }
+
+ public String toString() {
+ return String.format("FragmentSinkNode-%s:[SendTo: (%s)]", getId(), getDownStreamAddress());
+ }
+
+ public String getDownStreamAddress() {
+ if (getDownStreamEndpoint() == null) {
+ return "Not assigned";
+ }
+ return String.format(
+ "%s/%s/%s",
+ getDownStreamEndpoint().getIp(), getDownStreamInstanceId(), getDownStreamPlanNodeId());
+ }
+
+ public ExchangeNode getDownStreamNode() {
+ return downStreamNode;
+ }
+
+ public void setDownStreamNode(ExchangeNode downStreamNode) {
+ this.downStreamNode = downStreamNode;
+ }
+
+ public void setDownStream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+ this.downStreamEndpoint = endPoint;
+ this.downStreamInstanceId = instanceId;
+ this.downStreamPlanNodeId = nodeId;
+ }
+
+ public EndPoint getDownStreamEndpoint() {
+ return downStreamEndpoint;
+ }
+
+ public FragmentInstanceId getDownStreamInstanceId() {
+ return downStreamInstanceId;
+ }
+
+ public PlanNodeId getDownStreamPlanNodeId() {
+ return downStreamPlanNodeId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index 797611e..fabe25a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -55,4 +56,14 @@ public class CsvSourceNode extends SourceNode {
@Override
public void open() throws Exception {}
+
+ @Override
+ public DataRegionReplicaSet getDataRegionReplicaSet() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 674b6b4..e5fd770 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -61,7 +61,7 @@ public class SeriesAggregateScanNode extends SourceNode {
private String columnName;
// The id of DataRegion where the node will run
- private DataRegion dataRegion;
+ private DataRegionReplicaSet dataRegionReplicaSet;
public SeriesAggregateScanNode(PlanNodeId id) {
super(id);
@@ -102,13 +102,19 @@ public class SeriesAggregateScanNode extends SourceNode {
public void open() throws Exception {}
@Override
- public void close() throws Exception {}
+ public DataRegionReplicaSet getDataRegionReplicaSet() {
+ return this.dataRegionReplicaSet;
+ }
- public DataRegion getDataRegion() {
- return dataRegion;
+ @Override
+ public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+ this.dataRegionReplicaSet = dataRegionReplicaSet;
}
@Override
+ public void close() throws Exception {}
+
+ @Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitSeriesAggregate(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index bfa2faf..d7406ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -62,16 +62,17 @@ public class SeriesScanNode extends SourceNode {
private String columnName;
// The id of DataRegion where the node will run
- private DataRegion dataRegion;
+ private DataRegionReplicaSet dataRegionReplicaSet;
public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
}
- public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, DataRegion dataRegion) {
+ public SeriesScanNode(
+ PlanNodeId id, PartialPath seriesPath, DataRegionReplicaSet dataRegionReplicaSet) {
this(id, seriesPath);
- this.dataRegion = dataRegion;
+ this.dataRegionReplicaSet = dataRegionReplicaSet;
}
public void setTimeFilter(Filter timeFilter) {
@@ -88,6 +89,15 @@ public class SeriesScanNode extends SourceNode {
@Override
public void open() throws Exception {}
+ @Override
+ public DataRegionReplicaSet getDataRegionReplicaSet() {
+ return dataRegionReplicaSet;
+ }
+
+ public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegion) {
+ this.dataRegionReplicaSet = dataRegion;
+ }
+
public void setScanOrder(OrderBy scanOrder) {
this.scanOrder = scanOrder;
}
@@ -107,7 +117,7 @@ public class SeriesScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegion);
+ return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegionReplicaSet);
}
@Override
@@ -133,17 +143,9 @@ public class SeriesScanNode extends SourceNode {
return timeFilter;
}
- public void setDataRegion(DataRegion dataRegion) {
- this.dataRegion = dataRegion;
- }
-
- public DataRegion getDataRegion() {
- return dataRegion;
- }
-
public String toString() {
return String.format(
"SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
- this.getId(), this.getSeriesPath(), this.getDataRegion());
+ this.getId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 551e9d2..67c9e2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -28,4 +29,8 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
}
public abstract void open() throws Exception;
+
+ public abstract DataRegionReplicaSet getDataRegionReplicaSet();
+
+ public abstract void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 2b54026..9edaf5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -19,12 +19,12 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
@@ -37,11 +37,10 @@ public abstract class InsertNode extends PlanNode {
protected PartialPath devicePath;
protected boolean isAligned;
- protected String[] measurements;
- // get from client
+ protected MeasurementSchema[] measurements;
protected TSDataType[] dataTypes;
- // get from SchemaEngine
- protected IMeasurementMNode[] measurementMNodes;
+ // TODO(INSERT) need to change it to a function handle to update last time value
+ // protected IMeasurementMNode[] measurementMNodes;
/**
* device id reference, for reuse device id in both id table and memtable <br>
@@ -49,15 +48,12 @@ public abstract class InsertNode extends PlanNode {
*/
protected IDeviceID deviceID;
- // record the failed measurements, their reasons, and positions in "measurements"
- List<String> failedMeasurements;
- private List<Exception> failedExceptions;
- List<Integer> failedIndices;
-
protected InsertNode(PlanNodeId id) {
super(id);
}
+ // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
+ // info
public abstract List<InsertNode> splitByPartition(Analysis analysis);
public boolean needSplit() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 4140719..5ce7bbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -21,12 +21,31 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.utils.BitMap;
import java.util.List;
public class InsertTabletNode extends InsertNode {
- protected InsertTabletNode(PlanNodeId id) {
+ private long[] times; // times should be sorted. It is done in the session API.
+
+ private BitMap[] bitMaps;
+ private Object[] columns;
+
+ private int start;
+ private int end;
+ // when this plan is sub-plan split from another InsertTabletPlan, this indicates the original
+ // positions of values in
+ // this plan. For example, if the plan contains 5 timestamps, and range = [1,4,10,12], then it
+ // means that the first 3
+ // timestamps in this plan are from range[1,4) of the parent plan, and the last 2 timestamps are
+ // from range[10,12)
+ // of the parent plan.
+ // this is usually used to back-propagate exceptions to the parent plan without losing their
+ // proper positions.
+ private List<Integer> range;
+
+ public InsertTabletNode(PlanNodeId id) {
super(id);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
index 7bd9300..8b70ccc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
@@ -16,29 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution;
+package org.apache.iotdb.db.mpp.sql.statement.crud;
-import org.apache.iotdb.db.mpp.common.FragmentId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import java.util.List;
+public abstract class InsertBaseStatement extends Statement {
-public class FragmentInfo {
+ /**
+ * if use id table, this filed is id form of device path <br>
+ * if not, this filed is device path<br>
+ */
+ protected PartialPath devicePath;
- private final FragmentId stageId;
- private final FragmentState state;
- private final PlanFragment plan;
+ protected boolean isAligned;
+ protected String[] measurements;
+ // get from client
+ protected TSDataType[] dataTypes;
- private final List<FragmentInfo> childrenFragments;
+ public PartialPath getDevicePath() {
+ return devicePath;
+ }
- public FragmentInfo(
- FragmentId stageId,
- FragmentState state,
- PlanFragment plan,
- List<FragmentInfo> childrenFragments) {
- this.stageId = stageId;
- this.state = state;
- this.plan = plan;
- this.childrenFragments = childrenFragments;
+ public String[] getMeasurements() {
+ return measurements;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 38a56f3..9a97d66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.statement.crud;
-public class PlanFragmentId {
- private String id;
+import org.apache.iotdb.tsfile.utils.BitMap;
- public PlanFragmentId(String id) {
- this.id = id;
- }
+public class InsertTabletStatement extends InsertBaseStatement {
+
+ private long[] times; // times should be sorted. It is done in the session API.
+ private BitMap[] bitMaps;
+ private Object[] columns;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java
index 6ff3aeb..4892b18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.tree;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
@@ -70,4 +71,8 @@ public abstract class StatementVisitor<R, C> {
public R visitInsert(InsertStatement insertStatement, C context) {
return visitStatement(insertStatement, context);
}
+
+ public R visitInsertTablet(InsertTabletStatement insertTabletStatement, C context) {
+ return visitStatement(insertTabletStatement, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 2183348..7e8430b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -577,6 +577,7 @@ public abstract class PhysicalPlan implements IConsensusRequest {
*
* @throws QueryProcessException when the check fails
*/
+ // TODO(INSERT) move this check into analyze
public void checkIntegrity() throws QueryProcessException {}
public boolean isPrefixMatch() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 17e0e4d..810ed6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -59,6 +60,9 @@ public class SessionManager {
private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion =
new ConcurrentHashMap<>();
+ // TODO sessionIdToUsername and sessionIdToZoneId should be replaced with this
+ private final Map<Long, SessionInfo> sessionIdToSessionInfo = new ConcurrentHashMap<>();
+
protected SessionManager() {
// singleton
}
@@ -219,6 +223,10 @@ public class SessionManager {
return SessionManagerHelper.INSTANCE;
}
+ public SessionInfo getSessionInfo(long sessionId) {
+ return sessionIdToSessionInfo.get(sessionId);
+ }
+
private static class SessionManagerHelper {
private static final SessionManager INSTANCE = new SessionManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
index 45a42e0..924babf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
@@ -70,13 +70,13 @@ public abstract class ServiceProvider {
public static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+ // MPP: The following fields will be moved to Coordinator
public static final QueryTimeManager QUERY_TIME_MANAGER = QueryTimeManager.getInstance();
public static final TracingManager TRACING_MANAGER = TracingManager.getInstance();
public static final QueryFrequencyRecorder QUERY_FREQUENCY_RECORDER =
new QueryFrequencyRecorder(CONFIG);
-
- public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
private final Planner planner;
protected final IPlanExecutor executor;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index be520f4..d949e66 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -35,6 +35,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -60,6 +65,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.tracing.TracingConstant;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
@@ -151,7 +157,6 @@ import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.TRACING_MANAGER;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
@@ -162,6 +167,10 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
/** Thrift RPC implementation at server side. */
public class TSServiceImpl implements TSIService.Iface {
+ private static final Coordinator coordinator = Coordinator.getInstance();
+
+ public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
protected class QueryTask implements Callable<TSExecuteStatementResp> {
private PhysicalPlan plan;
@@ -1582,6 +1591,51 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
+ public TSStatus insertTabletV2(TSInsertTabletReq req) {
+ long t1 = System.currentTimeMillis();
+ try {
+ // TODO (xingtanzjr) move this method to SESSION_MANAGER
+ if (!serviceProvider.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
+ InsertTabletStatement statement = new InsertTabletStatement();
+ // InsertTabletPlan insertTabletPlan =
+ // new InsertTabletPlan(new PartialPath(req.getPrefixPath()), req.measurements);
+ // insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps,
+ // req.size));
+ // insertTabletPlan.setColumns(
+ // QueryDataSetUtils.readValuesFromBuffer(
+ // req.values, req.types, req.types.size(), req.size));
+ // insertTabletPlan.setBitMaps(
+ // QueryDataSetUtils.readBitMapsFromBuffer(req.values, req.types.size(), req.size));
+ // insertTabletPlan.setRowCount(req.size);
+ // insertTabletPlan.setDataTypes(req.types);
+ // insertTabletPlan.setAligned(req.isAligned);
+
+ // Step 2: call the coordinator
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ coordinator.execute(
+ statement,
+ new QueryId(String.valueOf(queryId)),
+ QueryType.WRITE,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "");
+
+ // TODO(INSERT) do this check in analyze
+ // TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+ // req.getSessionId());
+ return result.status;
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } finally {
+ addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+ }
+ }
+
@Override
public TSStatus insertTablets(TSInsertTabletsReq req) {
long t1 = System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index 39ea9f7..6c034fd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -30,7 +30,7 @@ import java.time.ZonedDateTime;
import static org.junit.Assert.fail;
-public class StatementAnalyzerTest {
+public class AnalyzerTest {
@Test
public void samePropertyKeyTest() {
@@ -41,7 +41,7 @@ public class StatementAnalyzerTest {
private void assertAnalyzeSemanticException(String sql, String message) {
try {
- StatementAnalyzer analyzer = new StatementAnalyzer(new Analysis(), new MPPQueryContext());
+ Analyzer analyzer = new Analyzer(new MPPQueryContext());
analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
fail();
} catch (SemanticException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 6bc38c9..4c224b8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -19,29 +19,28 @@
package org.apache.iotdb.db.mpp.sql.plan;
+import org.apache.iotdb.commons.partition.*;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.*;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class DistributionPlannerTest {
@@ -56,7 +55,7 @@ public class DistributionPlannerTest {
timeJoinNode.addChild(
new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
@@ -66,10 +65,8 @@ public class DistributionPlannerTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
PlanNode newRoot = planner.rewriteSource();
- // PlanNodeUtil.printPlanNode(newRoot);
+ // PlanNodeUtil.printPlanNode(newRoot);
assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
- assertEquals(newRoot.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
- assertEquals(newRoot.getChildren().get(0).getChildren().get(1).getChildren().size(), 2);
}
@Test
@@ -83,7 +80,7 @@ public class DistributionPlannerTest {
timeJoinNode.addChild(
new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
@@ -95,32 +92,97 @@ public class DistributionPlannerTest {
PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
// PlanNodeUtil.printPlanNode(rootWithExchange);
assertEquals(rootWithExchange.getChildren().get(0).getChildren().size(), 3);
- assertEquals(
- rootWithExchange.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
- assertTrue(rootWithExchange.getChildren().get(0).getChildren().get(1) instanceof ExchangeNode);
- assertEquals(
- rootWithExchange.getChildren().get(0).getChildren().get(1).getChildren().size(), 1);
- assertTrue(rootWithExchange.getChildren().get(0).getChildren().get(2) instanceof ExchangeNode);
- assertEquals(
- rootWithExchange.getChildren().get(0).getChildren().get(2).getChildren().size(), 1);
+ }
+
+ @Test
+ public void TestSplitFragment() throws IllegalPathException {
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(
+ PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+
+ timeJoinNode.addChild(
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+ timeJoinNode.addChild(
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+ timeJoinNode.addChild(
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+
+ LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ PlanNode rootAfterRewrite = planner.rewriteSource();
+ PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
+ SubPlan subPlan = planner.splitFragment(rootWithExchange);
+ assertEquals(subPlan.getChildren().size(), 2);
+ }
+
+ @Test
+ public void TestParallelPlan() throws IllegalPathException {
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(
+ PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+
+ timeJoinNode.addChild(
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+ timeJoinNode.addChild(
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+ timeJoinNode.addChild(
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+
+ LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+ DistributedQueryPlan plan = planner.planFragments();
+ plan.getInstances().forEach(System.out::println);
+ assertEquals(plan.getInstances().size(), 3);
}
private Analysis constructAnalysis() {
Analysis analysis = new Analysis();
- Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo = new HashMap<>();
- List<DataRegion> d1DataRegions = new ArrayList<>();
- d1DataRegions.add(new DataRegion(1, "192.0.0.1"));
- d1DataRegions.add(new DataRegion(2, "192.0.0.1"));
- Map<DataRegionTimeSlice, List<DataRegion>> d1DataRegionMap = new HashMap<>();
- d1DataRegionMap.put(new DataRegionTimeSlice(), d1DataRegions);
-
- List<DataRegion> d2DataRegions = new ArrayList<>();
- d2DataRegions.add(new DataRegion(3, "192.0.0.1"));
- Map<DataRegionTimeSlice, List<DataRegion>> d2DataRegionMap = new HashMap<>();
- d2DataRegionMap.put(new DataRegionTimeSlice(), d2DataRegions);
-
- dataPartitionInfo.put("root.sg.d1", d1DataRegionMap);
- dataPartitionInfo.put("root.sg.d2", d2DataRegionMap);
+
+ String device1 = "root.sg.d1";
+ String device2 = "root.sg.d22";
+ String device3 = "root.sg.d333";
+
+ DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap =
+ new HashMap<>();
+ List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(1),
+ Arrays.asList(new EndPoint("192.0.0.1", 9000), new EndPoint("192.0.0.2", 9000))));
+ d1DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(2),
+ Arrays.asList(new EndPoint("192.0.0.3", 9000), new EndPoint("192.0.0.4", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+ d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
+
+ List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(3),
+ Arrays.asList(new EndPoint("192.0.0.5", 9000), new EndPoint("192.0.0.6", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+ d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
+
+ sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
+ sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+
+ dataPartitionMap.put("root.sg", sgPartitionMap);
+
+ dataPartitionInfo.setDataPartitionMap(dataPartitionMap);
analysis.setDataPartitionInfo(dataPartitionInfo);
return analysis;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index fc02d5b..ec63ca3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.sql.plan;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.analyze.StatementAnalyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -81,7 +81,7 @@ public class LogicalPlannerTest {
PlanNode planNode = null;
try {
MPPQueryContext context = new MPPQueryContext();
- StatementAnalyzer analyzer = new StatementAnalyzer(new Analysis(), context);
+ Analyzer analyzer = new Analyzer(context);
Analysis analysis =
analyzer.analyze(
StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));