You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/14 06:11:46 UTC

[iotdb] branch xingtanzjr/device_group_id created (now b32fb2c4b7)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/device_group_id
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at b32fb2c4b7 make SeriesPartitionSlotExecutor as a common method in node-commons

This branch includes the following new commits:

     new b32fb2c4b7 make SeriesPartitionSlotExecutor as a common method in node-commons

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: make SeriesPartitionSlotExecutor as a common method in node-commons

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/device_group_id
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b32fb2c4b76283069c2c46d095faa09c5b192b6a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Apr 14 14:11:31 2022 +0800

    make SeriesPartitionSlotExecutor as a common method in node-commons
---
 .../iotdb/confignode/manager/PartitionManager.java | 22 +++------------
 .../persistence/PartitionInfoPersistence.java      | 11 ++++++--
 .../iotdb/commons/partition/DataPartition.java     | 31 +++++++++++++++-------
 .../executor/SeriesPartitionExecutor.java          | 19 +++++++++++++
 .../mpp/sql/analyze/FakePartitionFetcherImpl.java  |  6 ++++-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 24 ++++++++++++-----
 6 files changed, 75 insertions(+), 38 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 6a6b9ef26e..99f07bc31b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -38,8 +38,6 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -219,23 +217,9 @@ public class PartitionManager {
   /** Construct SeriesPartitionExecutor by iotdb-confignode.propertis */
   private void setSeriesPartitionExecutor() {
     ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
-    try {
-      Class<?> executor = Class.forName(conf.getSeriesPartitionExecutorClass());
-      Constructor<?> executorConstructor = executor.getConstructor(int.class);
-      this.executor =
-          (SeriesPartitionExecutor)
-              executorConstructor.newInstance(conf.getSeriesPartitionSlotNum());
-    } catch (ClassNotFoundException
-        | NoSuchMethodException
-        | InstantiationException
-        | IllegalAccessException
-        | InvocationTargetException e) {
-      LOGGER.error(
-          "Couldn't Constructor SeriesPartitionExecutor class: {}",
-          conf.getSeriesPartitionExecutorClass(),
-          e);
-      executor = null;
-    }
+    this.executor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            conf.getSeriesPartitionExecutorClass(), conf.getSeriesPartitionSlotNum());
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
index ab0eb0c221..c7154f0e13 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
 import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
@@ -60,7 +61,10 @@ public class PartitionInfoPersistence {
     this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
     this.schemaPartition = new SchemaPartition();
     this.schemaPartition.setSchemaPartitionMap(new HashMap<>());
-    this.dataPartition = new DataPartition();
+    this.dataPartition =
+        new DataPartition(
+            ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
+            ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
     this.dataPartition.setDataPartitionMap(new HashMap<>());
   }
 
@@ -135,7 +139,10 @@ public class PartitionInfoPersistence {
 
     try {
       dataPartitionDataSet.setDataPartition(
-          dataPartition.getDataPartition(physicalPlan.getPartitionSlotsMap()));
+          dataPartition.getDataPartition(
+              physicalPlan.getPartitionSlotsMap(),
+              ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
+              ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum()));
     } finally {
       dataPartitionReadWriteLock.readLock().unlock();
       dataPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 054c01b471..d69b42ea34 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,17 +30,24 @@ import java.util.stream.Collectors;
 
 public class DataPartition {
 
+  private String seriesSlotExecutorName;
+  private int seriesPartitionSlotNum;
+
   // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionPlaceInfo>>>>
   private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
       dataPartitionMap;
 
-  public DataPartition() {
-    // Empty constructor
+  public DataPartition(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
+    this.seriesSlotExecutorName = seriesSlotExecutorName;
+    this.seriesPartitionSlotNum = seriesPartitionSlotNum;
   }
 
   public DataPartition(
       Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
-          dataPartitionMap) {
+          dataPartitionMap,
+      String seriesSlotExecutorName,
+      int seriesPartitionSlotNum) {
+    this(seriesSlotExecutorName, seriesPartitionSlotNum);
     this.dataPartitionMap = dataPartitionMap;
   }
 
@@ -96,8 +105,10 @@ public class DataPartition {
   }
 
   private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
-    // TODO: (xingtanzjr) implement the real algorithm for calculation of DeviceGroupId
-    return new SeriesPartitionSlot(deviceName.length());
+    SeriesPartitionExecutor executor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            seriesSlotExecutorName, seriesPartitionSlotNum);
+    return executor.getSeriesPartitionSlot(deviceName);
   }
 
   private String getStorageGroupByDevice(String deviceName) {
@@ -121,7 +132,9 @@ public class DataPartition {
    *     Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
    */
   public DataPartition getDataPartition(
-      Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+      Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap,
+      String seriesSlotExecutorName,
+      int seriesPartitionSlotNum) {
     Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>> result =
         new HashMap<>();
 
@@ -153,7 +166,7 @@ public class DataPartition {
       }
     }
 
-    return new DataPartition(result);
+    return new DataPartition(result, seriesSlotExecutorName, seriesPartitionSlotNum);
   }
 
   /**
@@ -161,8 +174,8 @@ public class DataPartition {
    *
    * @param partitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
    *     List<TimePartitionSlot>>>
-   * @return Map<StorageGroupName, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>, unassigned
-   *     PartitionSlots
+   * @return Map<StorageGroupName, Map < SeriesPartitionSlot, List < TimePartitionSlot>>>,
+   *     unassigned PartitionSlots
    */
   public Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>
       filterNoAssignedDataPartitionSlots(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
index 285f2ff95c..8631951342 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
@@ -20,6 +20,9 @@ package org.apache.iotdb.commons.partition.executor;
 
 import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
 /** All SeriesPartitionExecutors must be subclasses of SeriesPartitionExecutor */
 public abstract class SeriesPartitionExecutor {
 
@@ -30,4 +33,20 @@ public abstract class SeriesPartitionExecutor {
   }
 
   public abstract SeriesPartitionSlot getSeriesPartitionSlot(String device);
+
+  public static SeriesPartitionExecutor getSeriesPartitionExecutor(
+      String executorName, int seriesPartitionSlotNum) {
+    try {
+      Class<?> executor = Class.forName(executorName);
+      Constructor<?> executorConstructor = executor.getConstructor(int.class);
+      return (SeriesPartitionExecutor) executorConstructor.newInstance(seriesPartitionSlotNum);
+    } catch (ClassNotFoundException
+        | NoSuchMethodException
+        | InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException e) {
+      throw new IllegalArgumentException(
+          String.format("Couldn't Constructor SeriesPartitionExecutor class: %s", executorName));
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
index 94167a23ae..870a6fb2c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.*;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
 import java.util.*;
 
@@ -38,7 +39,10 @@ public class FakePartitionFetcherImpl implements IPartitionFetcher {
     String device2 = "root.sg.d22";
     String device3 = "root.sg.d333";
 
-    DataPartition dataPartition = new DataPartition();
+    DataPartition dataPartition =
+        new DataPartition(
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
     Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
         dataPartitionMap = new HashMap<>();
     Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 41813bd337..e428970fb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -326,13 +328,21 @@ public class DistributionPlannerTest {
   }
 
   private Analysis constructAnalysis() {
+
+    SeriesPartitionExecutor executor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
     Analysis analysis = new Analysis();
 
     String device1 = "root.sg.d1";
     String device2 = "root.sg.d22";
     String device3 = "root.sg.d333";
 
-    DataPartition dataPartition = new DataPartition();
+    DataPartition dataPartition =
+        new DataPartition(
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
     Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
         dataPartitionMap = new HashMap<>();
     Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap =
@@ -384,9 +394,9 @@ public class DistributionPlannerTest {
     Map<TimePartitionSlot, List<RegionReplicaSet>> d3DataRegionMap = new HashMap<>();
     d3DataRegionMap.put(new TimePartitionSlot(0), d3DataRegions);
 
-    sgPartitionMap.put(new SeriesPartitionSlot(device1.length()), d1DataRegionMap);
-    sgPartitionMap.put(new SeriesPartitionSlot(device2.length()), d2DataRegionMap);
-    sgPartitionMap.put(new SeriesPartitionSlot(device3.length()), d3DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
 
     dataPartitionMap.put("root.sg", sgPartitionMap);
 
@@ -413,9 +423,9 @@ public class DistributionPlannerTest {
                 new DataNodeLocation(21, new Endpoint("192.0.1.1", 9000)),
                 new DataNodeLocation(22, new Endpoint("192.0.1.2", 9000))));
 
-    schemaRegionMap.put(new SeriesPartitionSlot(device1.length()), schemaRegion1);
-    schemaRegionMap.put(new SeriesPartitionSlot(device2.length()), schemaRegion2);
-    schemaRegionMap.put(new SeriesPartitionSlot(device3.length()), schemaRegion2);
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
     schemaPartitionMap.put("root.sg", schemaRegionMap);
     schemaPartition.setSchemaPartitionMap(schemaPartitionMap);