You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/28 15:11:27 UTC

[iotdb] 03/03: add fake interface

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f1548d18c235b7d0a9c24dec0b50075c92ea7919
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Mar 28 23:02:58 2022 +0800

    add fake interface
---
 .../commons/partition/DataRegionReplicaSet.java    |   8 ++
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |  40 +++++++-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |   4 +-
 .../mpp/sql/analyze/FakePartitionFetcherImpl.java  | 107 +++++++++++++++++++++
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |  30 ++++++
 .../db/mpp/sql/planner/DistributionPlanner.java    |   8 +-
 .../planner/plan/node/process/ExchangeNode.java    |   4 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  23 ++++-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  46 +++++++++
 9 files changed, 256 insertions(+), 14 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
index 6ab4fbf..7346363 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
@@ -50,4 +50,12 @@ public class DataRegionReplicaSet {
   public String toString() {
     return String.format("%s:%s", Id, endPointList);
   }
+
+  public int hashCode() {
+    return toString().hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    return obj instanceof DataRegionReplicaSet && obj.toString().equals(toString());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 221a115..6412a6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.common.schematree;
 
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -27,7 +28,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class SchemaTree {
 
@@ -36,12 +39,47 @@ public class SchemaTree {
   /**
    * Return all measurement paths for given path pattern and filter the result by slimit and offset.
    *
-   * @param pathPattern can be a pattern or a full path of timeseries.
+   * @param pathPattern   can be a pattern or a full path of timeseries.
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    * @return Left: all measurement paths; Right: remaining series offset
    */
   public Pair<List<MeasurementPath>, Integer> searchMeasurementPaths(
       PartialPath pathPattern, int slimit, int soffset, boolean isPrefixMatch) {
+    // TODO: (xingtanzjr) we mock some results here to test the whole procedure
+    try {
+      String[] paths = new String[]{
+          "root.sg.d1.s1",
+          "root.sg.d1.s2",
+          "root.sg.d22.s1",
+          "root.sg.d22.s2",
+          "root.sg.d333.s1",
+          "root.sg.d333.s2",
+      };
+
+      List<MeasurementPath> result = new ArrayList<>();
+      String target = pathPattern.getFullPath();
+      StringBuilder noStar = new StringBuilder();
+      boolean lastCharIsStar = false;
+      for(int i = 0 ; i < target.length(); i ++) {
+        char c = target.charAt(i);
+        if (c == '*' || (lastCharIsStar && c == '.')) {
+          lastCharIsStar = c == '*';
+          continue;
+        }
+        lastCharIsStar = false;
+        noStar.append(String.valueOf(c));
+      }
+
+      for (String path : paths) {
+        if (path.contains(noStar)) {
+          result.add(new MeasurementPath(path));
+        }
+      }
+      return new Pair<>(result, 0);
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+    }
     return new Pair<>(new ArrayList<>(), 0);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 942d26d..2b905f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -60,9 +60,9 @@ public class Analyzer {
   private final MPPQueryContext context;
 
   // TODO need to use factory to decide standalone or cluster
-  private final IPartitionFetcher partitionFetcher = StandalonePartitionFetcher.getInstance();
+  private final IPartitionFetcher partitionFetcher = new FakePartitionFetcherImpl();
   // TODO need to use factory to decide standalone or cluster
-  private final ISchemaFetcher schemaFetcher = StandaloneSchemaFetcher.getInstance();
+  private final ISchemaFetcher schemaFetcher = new FakeSchemaFetcherImpl();
 
   public Analyzer(MPPQueryContext context) {
     this.context = context;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
new file mode 100644
index 0000000..ecee1a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.commons.partition.*;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import java.util.*;
+
+public class FakePartitionFetcherImpl implements IPartitionFetcher {
+  @Override
+  public DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter) {
+    return null;
+  }
+
+  @Override
+  public DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+    String device1 = "root.sg.d1";
+    String device2 = "root.sg.d22";
+    String device3 = "root.sg.d333";
+
+    DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
+    Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+        dataPartitionMap = new HashMap<>();
+    Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap =
+        new HashMap<>();
+
+    List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+    d1DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(1),
+            Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
+    d1DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(2),
+            Arrays.asList(new EndPoint("192.0.2.1", 9000), new EndPoint("192.0.2.2", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+    d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
+
+    List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+    d2DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(3),
+            Arrays.asList(new EndPoint("192.0.3.1", 9000), new EndPoint("192.0.3.2", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+    d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
+
+    List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>();
+    d3DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(1),
+            Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
+    d3DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(4),
+            Arrays.asList(new EndPoint("192.0.4.1", 9000), new EndPoint("192.0.4.2", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+    d3DataRegionMap.put(new TimePartitionId(), d3DataRegions);
+
+    sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
+    sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+    sgPartitionMap.put(new DeviceGroupId(device3.length()), d3DataRegionMap);
+
+    dataPartitionMap.put("root.sg", sgPartitionMap);
+
+    dataPartitionInfo.setDataPartitionMap(dataPartitionMap);
+
+    return dataPartitionInfo;
+  }
+
+  @Override
+  public SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId) {
+    return null;
+  }
+
+  @Override
+  public SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId) {
+    return null;
+  }
+
+  @Override
+  public PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter) {
+    return null;
+  }
+
+  @Override
+  public PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
new file mode 100644
index 0000000..44dd06e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+
+public class FakeSchemaFetcherImpl implements ISchemaFetcher{
+    @Override
+    public SchemaTree fetchSchema(PathPatternTree patternTree) {
+        return new SchemaTree();
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 79cab0f..cd26a35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -28,11 +28,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
@@ -75,7 +71,9 @@ public class DistributionPlanner {
 
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
+    System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+    System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
     SubPlan subPlan = splitFragment(rootWithExchange);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
     return new DistributedQueryPlan(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 3e016be..950e487 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -104,8 +104,8 @@ public class ExchangeNode extends PlanNode {
 
   public String toString() {
     return String.format(
-        "ExchangeNode-%s: [SourceNodeId: %s, SourceAddress:%s]",
-        getId(), remoteSourceNode.getId(), getSourceAddress());
+        "ExchangeNode-%s: [SourceAddress:%s]",
+        getId(), getSourceAddress());
   }
 
   public String getSourceAddress() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 40f78fb..aba4a00 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -138,7 +138,7 @@ public class DistributionPlannerTest {
     timeJoinNode.addChild(
         new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d333.s1")));
 
     LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
 
@@ -164,15 +164,16 @@ public class DistributionPlannerTest {
         dataPartitionMap = new HashMap<>();
     Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap =
         new HashMap<>();
+
     List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
     d1DataRegions.add(
         new DataRegionReplicaSet(
             new DataRegionId(1),
-            Arrays.asList(new EndPoint("192.0.0.1", 9000), new EndPoint("192.0.0.2", 9000))));
+            Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
     d1DataRegions.add(
         new DataRegionReplicaSet(
             new DataRegionId(2),
-            Arrays.asList(new EndPoint("192.0.0.3", 9000), new EndPoint("192.0.0.4", 9000))));
+            Arrays.asList(new EndPoint("192.0.2.1", 9000), new EndPoint("192.0.2.2", 9000))));
     Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
     d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
 
@@ -180,12 +181,26 @@ public class DistributionPlannerTest {
     d2DataRegions.add(
         new DataRegionReplicaSet(
             new DataRegionId(3),
-            Arrays.asList(new EndPoint("192.0.0.5", 9000), new EndPoint("192.0.0.6", 9000))));
+            Arrays.asList(new EndPoint("192.0.3.1", 9000), new EndPoint("192.0.3.2", 9000))));
     Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
     d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
 
+    List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>();
+    d3DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(1),
+            Arrays.asList(new EndPoint("192.0.1.1", 9000), new EndPoint("192.0.1.2", 9000))));
+    d3DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(4),
+            Arrays.asList(new EndPoint("192.0.4.1", 9000), new EndPoint("192.0.4.2", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+    d3DataRegionMap.put(new TimePartitionId(), d3DataRegions);
+
     sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
     sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+    sgPartitionMap.put(new DeviceGroupId(device3.length()), d3DataRegionMap);
+
 
     dataPartitionMap.put("root.sg", sgPartitionMap);
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
new file mode 100644
index 0000000..bf8580b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.execution.QueryExecution;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.junit.Test;
+
+import java.time.ZoneId;
+
+public class QueryPlannerTest {
+
+  @Test
+  public void TestSqlToDistributedPlan() {
+
+    String querySql = "SELECT d1.*, d22.s1 FROM root.sg";
+
+    Statement stmt = StatementGenerator.createStatement(querySql, ZoneId.systemDefault());
+    System.out.println(stmt);
+
+    QueryExecution queryExecution = new QueryExecution(stmt, new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ));
+    System.out.println(queryExecution);
+  }
+}