You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/28 15:11:27 UTC
[iotdb] 03/03: add fake interface
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f1548d18c235b7d0a9c24dec0b50075c92ea7919
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Mar 28 23:02:58 2022 +0800
add fake interface
---
.../commons/partition/DataRegionReplicaSet.java | 8 ++
.../iotdb/db/mpp/common/schematree/SchemaTree.java | 40 +++++++-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 4 +-
.../mpp/sql/analyze/FakePartitionFetcherImpl.java | 107 +++++++++++++++++++++
.../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java | 30 ++++++
.../db/mpp/sql/planner/DistributionPlanner.java | 8 +-
.../planner/plan/node/process/ExchangeNode.java | 4 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 23 ++++-
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 46 +++++++++
9 files changed, 256 insertions(+), 14 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
index 6ab4fbf..7346363 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
@@ -50,4 +50,12 @@ public class DataRegionReplicaSet {
public String toString() {
return String.format("%s:%s", Id, endPointList);
}
+
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ return obj instanceof DataRegionReplicaSet && obj.toString().equals(toString());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 221a115..6412a6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.common.schematree;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -27,7 +28,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class SchemaTree {
@@ -36,12 +39,47 @@ public class SchemaTree {
/**
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
*
- * @param pathPattern can be a pattern or a full path of timeseries.
+ * @param pathPattern can be a pattern or a full path of timeseries.
* @param isPrefixMatch if true, the path pattern is used to match prefix path
* @return Left: all measurement paths; Right: remaining series offset
*/
public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
PartialPath pathPattern, int slimit, int soffset, boolean isPrefixMatch) {
+ // TODO: (xingtanzjr) we mock some results here to test the whole procedure
+ try {
+ String[] paths = new String[]{
+ "root.sg.d1.s1",
+ "root.sg.d1.s2",
+ "root.sg.d22.s1",
+ "root.sg.d22.s2",
+ "root.sg.d333.s1",
+ "root.sg.d333.s2",
+ };
+
+ List<MeasurementPath> result = new ArrayList<>();
+ String target = pathPattern.getFullPath();
+ StringBuilder noStar = new StringBuilder();
+ boolean lastCharIsStar = false;
+ for(int i = 0 ; i < target.length(); i ++) {
+ char c = target.charAt(i);
+ if (c == '*' || (lastCharIsStar && c == '.')) {
+ lastCharIsStar = c == '*';
+ continue;
+ }
+ lastCharIsStar = false;
+ noStar.append(String.valueOf(c));
+ }
+
+ for (String path : paths) {
+ if (path.contains(noStar)) {
+ result.add(new MeasurementPath(path));
+ }
+ }
+ return new Pair<>(result, 0);
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
return new Pair<>(new ArrayList<>(), 0);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 942d26d..2b905f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -60,9 +60,9 @@ public class Analyzer {
private final MPPQueryContext context;
// TODO need to use factory to decide standalone or cluster
- private final IPartitionFetcher partitionFetcher = StandalonePartitionFetcher.getInstance();
+ private final IPartitionFetcher partitionFetcher = new FakePartitionFetcherImpl();
// TODO need to use factory to decide standalone or cluster
- private final ISchemaFetcher schemaFetcher = StandaloneSchemaFetcher.getInstance();
+ private final ISchemaFetcher schemaFetcher = new FakeSchemaFetcherImpl();
public Analyzer(MPPQueryContext context) {
this.context = context;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
new file mode 100644
index 0000000..ecee1a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.commons.partition.*;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import java.util.*;
+
+public class FakePartitionFetcherImpl implements IPartitionFetcher {
+ @Override
+ public DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter) {
+ return null;
+ }
+
+ @Override
+ public DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+ String device1 = "root.sg.d1";
+ String device2 = "root.sg.d22";
+ String device3 = "root.sg.d333";
+
+ DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap =
+ new HashMap<>();
+
+ List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(1),
+ Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
+ d1DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(2),
+ Arrays.asList(new EndPoint("192.0.2.1", 9000), new EndPoint("192.0.2.2", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+ d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
+
+ List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(3),
+ Arrays.asList(new EndPoint("192.0.3.1", 9000), new EndPoint("192.0.3.2", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+ d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
+
+ List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>();
+ d3DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(1),
+ Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
+ d3DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(4),
+ Arrays.asList(new EndPoint("192.0.4.1", 9000), new EndPoint("192.0.4.2", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+ d3DataRegionMap.put(new TimePartitionId(), d3DataRegions);
+
+ sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
+ sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+ sgPartitionMap.put(new DeviceGroupId(device3.length()), d3DataRegionMap);
+
+ dataPartitionMap.put("root.sg", sgPartitionMap);
+
+ dataPartitionInfo.setDataPartitionMap(dataPartitionMap);
+
+ return dataPartitionInfo;
+ }
+
+ @Override
+ public SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId) {
+ return null;
+ }
+
+ @Override
+ public SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId) {
+ return null;
+ }
+
+ @Override
+ public PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter) {
+ return null;
+ }
+
+ @Override
+ public PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
new file mode 100644
index 0000000..44dd06e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+
+public class FakeSchemaFetcherImpl implements ISchemaFetcher{
+ @Override
+ public SchemaTree fetchSchema(PathPatternTree patternTree) {
+ return new SchemaTree();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 79cab0f..cd26a35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -28,11 +28,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
@@ -75,7 +71,9 @@ public class DistributionPlanner {
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
+ System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+ System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
SubPlan subPlan = splitFragment(rootWithExchange);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
return new DistributedQueryPlan(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 3e016be..950e487 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -104,8 +104,8 @@ public class ExchangeNode extends PlanNode {
public String toString() {
return String.format(
- "ExchangeNode-%s: [SourceNodeId: %s, SourceAddress:%s]",
- getId(), remoteSourceNode.getId(), getSourceAddress());
+ "ExchangeNode-%s: [SourceAddress:%s]",
+ getId(), getSourceAddress());
}
public String getSourceAddress() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 40f78fb..aba4a00 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -138,7 +138,7 @@ public class DistributionPlannerTest {
timeJoinNode.addChild(
new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d333.s1")));
LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
@@ -164,15 +164,16 @@ public class DistributionPlannerTest {
dataPartitionMap = new HashMap<>();
Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap =
new HashMap<>();
+
List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
d1DataRegions.add(
new DataRegionReplicaSet(
new DataRegionId(1),
- Arrays.asList(new EndPoint("192.0.0.1", 9000), new EndPoint("192.0.0.2", 9000))));
+ Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
d1DataRegions.add(
new DataRegionReplicaSet(
new DataRegionId(2),
- Arrays.asList(new EndPoint("192.0.0.3", 9000), new EndPoint("192.0.0.4", 9000))));
+ Arrays.asList(new EndPoint("192.0.2.1", 9000), new EndPoint("192.0.2.2", 9000))));
Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
@@ -180,12 +181,26 @@ public class DistributionPlannerTest {
d2DataRegions.add(
new DataRegionReplicaSet(
new DataRegionId(3),
- Arrays.asList(new EndPoint("192.0.0.5", 9000), new EndPoint("192.0.0.6", 9000))));
+ Arrays.asList(new EndPoint("192.0.3.1", 9000), new EndPoint("192.0.3.2", 9000))));
Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
+ List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>();
+ d3DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(1),
+ Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
+ d3DataRegions.add(
+ new DataRegionReplicaSet(
+ new DataRegionId(4),
+ Arrays.asList(new EndPoint("192.0.4.1", 9000), new EndPoint("192.0.4.2", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+ d3DataRegionMap.put(new TimePartitionId(), d3DataRegions);
+
sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+ sgPartitionMap.put(new DeviceGroupId(device3.length()), d3DataRegionMap);
+
dataPartitionMap.put("root.sg", sgPartitionMap);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
new file mode 100644
index 0000000..bf8580b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.junit.Test;
+
+import java.time.ZoneId;
+
+public class QueryPlannerTest {
+
+ @Test
+ public void TestSqlToDistributedPlan() {
+
+ String querySql = "SELECT d1.*, d22.s1 FROM root.sg";
+
+ Statement stmt = StatementGenerator.createStatement(querySql, ZoneId.systemDefault());
+ System.out.println(stmt);
+
+ QueryExecution queryExecution = new QueryExecution(stmt, new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ));
+ System.out.println(queryExecution);
+ }
+}