You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/03/23 07:10:16 UTC

[iotdb] branch master updated: [IOTDB-1251] optimize route intervals (#2886)

This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b214511  [IOTDB-1251] optimize route intervals (#2886)
b214511 is described below

commit b214511af80b573c3aa99e43d5a07472373cb29c
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Tue Mar 23 15:09:53 2021 +0800

    [IOTDB-1251] optimize route intervals (#2886)
---
 .../cluster/server/member/MetaGroupMember.java     | 53 ++++++++++++++--------
 .../query/groupby/RemoteGroupByExecutorTest.java   |  4 +-
 .../cluster/server/member/MetaGroupMemberTest.java | 33 ++++++++++++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +-
 4 files changed, 71 insertions(+), 23 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 620bbc8..daa1aa9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1431,9 +1431,31 @@ public class MetaGroupMember extends RaftMember {
     return routeIntervals(intervals, path);
   }
 
+  /**
+   * obtaining partition group based on path and intervals
+   *
+   * @param intervals time intervals, include minimum and maximum value
+   * @param path partial path
+   * @return data partition on which the current interval of the current path is stored
+   * @throws StorageEngineException if Failed to get storage group path
+   */
   public List<PartitionGroup> routeIntervals(Intervals intervals, PartialPath path)
       throws StorageEngineException {
     List<PartitionGroup> partitionGroups = new ArrayList<>();
+    PartialPath storageGroupName = null;
+    try {
+      storageGroupName = IoTDB.metaManager.getStorageGroupPath(path);
+    } catch (MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+
+    // if cluster is not enable-partition, a partial data storage in one PartitionGroup
+    if (!StorageEngine.isEnablePartition()) {
+      PartitionGroup partitionGroup = partitionTable.route(storageGroupName.getFullPath(), 0L);
+      partitionGroups.add(partitionGroup);
+      return partitionGroups;
+    }
+
     long firstLB = intervals.getLowerBound(0);
     long lastUB = intervals.getUpperBound(intervals.getIntervalSize() - 1);
 
@@ -1444,24 +1466,19 @@ public class MetaGroupMember extends RaftMember {
     } else {
       // compute the related data groups of all intervals
       // TODO-Cluster#690: change to a broadcast when the computation is too expensive
-      try {
-        PartialPath storageGroupName = IoTDB.metaManager.getStorageGroupPath(path);
-        Set<Node> groupHeaders = new HashSet<>();
-        for (int i = 0; i < intervals.getIntervalSize(); i++) {
-          // compute the headers of groups involved in every interval
-          PartitionUtils.getIntervalHeaders(
-              storageGroupName.getFullPath(),
-              intervals.getLowerBound(i),
-              intervals.getUpperBound(i),
-              partitionTable,
-              groupHeaders);
-        }
-        // translate the headers to groups
-        for (Node groupHeader : groupHeaders) {
-          partitionGroups.add(partitionTable.getHeaderGroup(groupHeader));
-        }
-      } catch (MetadataException e) {
-        throw new StorageEngineException(e);
+      Set<Node> groupHeaders = new HashSet<>();
+      for (int i = 0; i < intervals.getIntervalSize(); i++) {
+        // compute the headers of groups involved in every interval
+        PartitionUtils.getIntervalHeaders(
+            storageGroupName.getFullPath(),
+            intervals.getLowerBound(i),
+            intervals.getUpperBound(i),
+            partitionTable,
+            groupHeaders);
+      }
+      // translate the headers to groups
+      for (Node groupHeader : groupHeaders) {
+        partitionGroups.add(partitionTable.getHeaderGroup(groupHeader));
       }
     }
     return partitionGroups;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
index eb678aa..dac449a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
@@ -76,7 +76,7 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
       for (int i = 0; i < groupByExecutors.size(); i++) {
         GroupByExecutor groupByExecutor = groupByExecutors.get(i);
         Object[] answers;
-        if (i == 1) {
+        if (groupByExecutors.size() == 1) {
           // a series is only managed by one group
           List<AggregateResult> aggregateResults;
           answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
@@ -138,7 +138,7 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
       for (int i = 0; i < groupByExecutors.size(); i++) {
         GroupByExecutor groupByExecutor = groupByExecutors.get(i);
         Object[] answers;
-        if (i == 1) {
+        if (groupByExecutors.size() == 1) {
           // a series is only managed by one group
           List<AggregateResult> aggregateResults;
           answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index e4e6b25..bc2e603 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.Constants;
+import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -1320,6 +1321,38 @@ public class MetaGroupMemberTest extends MemberTest {
     assertTrue(testMetaMember.getAllNodes().contains(TestUtils.getNode(10)));
   }
 
+  @Test
+  public void testRouteIntervalsDisablePartition()
+      throws IllegalPathException, StorageEngineException {
+    boolean isEablePartition = StorageEngine.isEnablePartition();
+    StorageEngine.setEnablePartition(false);
+    testMetaMember.setCharacter(LEADER);
+    testMetaMember.setLeader(testMetaMember.getThisNode());
+    PartitionUtils.Intervals intervals = new PartitionUtils.Intervals();
+    intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);
+
+    List<PartitionGroup> partitionGroups =
+        testMetaMember.routeIntervals(intervals, new PartialPath(TestUtils.getTestSg(0)));
+    assertEquals(1, partitionGroups.size());
+    StorageEngine.setEnablePartition(isEablePartition);
+  }
+
+  @Test
+  public void testRouteIntervalsEnablePartition()
+      throws IllegalPathException, StorageEngineException {
+    boolean isEablePartition = StorageEngine.isEnablePartition();
+    StorageEngine.setEnablePartition(true);
+    testMetaMember.setCharacter(LEADER);
+    testMetaMember.setLeader(testMetaMember.getThisNode());
+    PartitionUtils.Intervals intervals = new PartitionUtils.Intervals();
+    intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);
+
+    List<PartitionGroup> partitionGroups =
+        testMetaMember.routeIntervals(intervals, new PartialPath(TestUtils.getTestSg(0)));
+    assertTrue(partitionGroups.size() > 1);
+    StorageEngine.setEnablePartition(isEablePartition);
+  }
+
   private void doRemoveNode(AtomicReference<Long> resultRef, Node nodeToRemove) {
     new MetaAsyncService(testMetaMember)
         .removeNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index b774e52..9bc81ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -92,6 +92,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 public class StorageEngine implements IService {
+  private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
@@ -105,7 +106,6 @@ public class StorageEngine implements IService {
   private static boolean enablePartition =
       IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
 
-  private final Logger logger;
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
    * will have a subfolder under the systemDir.
@@ -126,7 +126,6 @@ public class StorageEngine implements IService {
   private List<FlushListener> customFlushListeners = new ArrayList<>();
 
   private StorageEngine() {
-    logger = LoggerFactory.getLogger(StorageEngine.class);
     systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
 
     // build time Interval to divide time partition
@@ -190,7 +189,6 @@ public class StorageEngine implements IService {
     return enablePartition ? time / timePartitionInterval : 0;
   }
 
-  @TestOnly
   public static boolean isEnablePartition() {
     return enablePartition;
   }