You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/03/23 07:10:16 UTC
[iotdb] branch master updated: [IOTDB-1251] optimize route
intervals (#2886)
This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b214511 [IOTDB-1251] optimize route intervals (#2886)
b214511 is described below
commit b214511af80b573c3aa99e43d5a07472373cb29c
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Tue Mar 23 15:09:53 2021 +0800
[IOTDB-1251] optimize route intervals (#2886)
---
.../cluster/server/member/MetaGroupMember.java | 53 ++++++++++++++--------
.../query/groupby/RemoteGroupByExecutorTest.java | 4 +-
.../cluster/server/member/MetaGroupMemberTest.java | 33 ++++++++++++++
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
4 files changed, 71 insertions(+), 23 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 620bbc8..daa1aa9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1431,9 +1431,31 @@ public class MetaGroupMember extends RaftMember {
return routeIntervals(intervals, path);
}
+ /**
+ * obtaining partition group based on path and intervals
+ *
+ * @param intervals time intervals, include minimum and maximum value
+ * @param path partial path
+ * @return data partition on which the current interval of the current path is stored
+ * @throws StorageEngineException if Failed to get storage group path
+ */
public List<PartitionGroup> routeIntervals(Intervals intervals, PartialPath path)
throws StorageEngineException {
List<PartitionGroup> partitionGroups = new ArrayList<>();
+ PartialPath storageGroupName = null;
+ try {
+ storageGroupName = IoTDB.metaManager.getStorageGroupPath(path);
+ } catch (MetadataException e) {
+ throw new StorageEngineException(e);
+ }
+
+ // if cluster is not enable-partition, a partial data storage in one PartitionGroup
+ if (!StorageEngine.isEnablePartition()) {
+ PartitionGroup partitionGroup = partitionTable.route(storageGroupName.getFullPath(), 0L);
+ partitionGroups.add(partitionGroup);
+ return partitionGroups;
+ }
+
long firstLB = intervals.getLowerBound(0);
long lastUB = intervals.getUpperBound(intervals.getIntervalSize() - 1);
@@ -1444,24 +1466,19 @@ public class MetaGroupMember extends RaftMember {
} else {
// compute the related data groups of all intervals
// TODO-Cluster#690: change to a broadcast when the computation is too expensive
- try {
- PartialPath storageGroupName = IoTDB.metaManager.getStorageGroupPath(path);
- Set<Node> groupHeaders = new HashSet<>();
- for (int i = 0; i < intervals.getIntervalSize(); i++) {
- // compute the headers of groups involved in every interval
- PartitionUtils.getIntervalHeaders(
- storageGroupName.getFullPath(),
- intervals.getLowerBound(i),
- intervals.getUpperBound(i),
- partitionTable,
- groupHeaders);
- }
- // translate the headers to groups
- for (Node groupHeader : groupHeaders) {
- partitionGroups.add(partitionTable.getHeaderGroup(groupHeader));
- }
- } catch (MetadataException e) {
- throw new StorageEngineException(e);
+ Set<Node> groupHeaders = new HashSet<>();
+ for (int i = 0; i < intervals.getIntervalSize(); i++) {
+ // compute the headers of groups involved in every interval
+ PartitionUtils.getIntervalHeaders(
+ storageGroupName.getFullPath(),
+ intervals.getLowerBound(i),
+ intervals.getUpperBound(i),
+ partitionTable,
+ groupHeaders);
+ }
+ // translate the headers to groups
+ for (Node groupHeader : groupHeaders) {
+ partitionGroups.add(partitionTable.getHeaderGroup(groupHeader));
}
}
return partitionGroups;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
index eb678aa..dac449a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
@@ -76,7 +76,7 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
for (int i = 0; i < groupByExecutors.size(); i++) {
GroupByExecutor groupByExecutor = groupByExecutors.get(i);
Object[] answers;
- if (i == 1) {
+ if (groupByExecutors.size() == 1) {
// a series is only managed by one group
List<AggregateResult> aggregateResults;
answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
@@ -138,7 +138,7 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
for (int i = 0; i < groupByExecutors.size(); i++) {
GroupByExecutor groupByExecutor = groupByExecutors.get(i);
Object[] answers;
- if (i == 1) {
+ if (groupByExecutors.size() == 1) {
// a series is only managed by one group
List<AggregateResult> aggregateResults;
answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index e4e6b25..bc2e603 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.service.MetaAsyncService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.Constants;
+import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -1320,6 +1321,38 @@ public class MetaGroupMemberTest extends MemberTest {
assertTrue(testMetaMember.getAllNodes().contains(TestUtils.getNode(10)));
}
+ @Test
+ public void testRouteIntervalsDisablePartition()
+ throws IllegalPathException, StorageEngineException {
+ boolean isEablePartition = StorageEngine.isEnablePartition();
+ StorageEngine.setEnablePartition(false);
+ testMetaMember.setCharacter(LEADER);
+ testMetaMember.setLeader(testMetaMember.getThisNode());
+ PartitionUtils.Intervals intervals = new PartitionUtils.Intervals();
+ intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);
+
+ List<PartitionGroup> partitionGroups =
+ testMetaMember.routeIntervals(intervals, new PartialPath(TestUtils.getTestSg(0)));
+ assertEquals(1, partitionGroups.size());
+ StorageEngine.setEnablePartition(isEablePartition);
+ }
+
+ @Test
+ public void testRouteIntervalsEnablePartition()
+ throws IllegalPathException, StorageEngineException {
+ boolean isEablePartition = StorageEngine.isEnablePartition();
+ StorageEngine.setEnablePartition(true);
+ testMetaMember.setCharacter(LEADER);
+ testMetaMember.setLeader(testMetaMember.getThisNode());
+ PartitionUtils.Intervals intervals = new PartitionUtils.Intervals();
+ intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);
+
+ List<PartitionGroup> partitionGroups =
+ testMetaMember.routeIntervals(intervals, new PartialPath(TestUtils.getTestSg(0)));
+ assertTrue(partitionGroups.size() > 1);
+ StorageEngine.setEnablePartition(isEablePartition);
+ }
+
private void doRemoveNode(AtomicReference<Long> resultRef, Node nodeToRemove) {
new MetaAsyncService(testMetaMember)
.removeNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index b774e52..9bc81ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -92,6 +92,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class StorageEngine implements IService {
+ private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
@@ -105,7 +106,6 @@ public class StorageEngine implements IService {
private static boolean enablePartition =
IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
- private final Logger logger;
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
* will have a subfolder under the systemDir.
@@ -126,7 +126,6 @@ public class StorageEngine implements IService {
private List<FlushListener> customFlushListeners = new ArrayList<>();
private StorageEngine() {
- logger = LoggerFactory.getLogger(StorageEngine.class);
systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
// build time Interval to divide time partition
@@ -190,7 +189,6 @@ public class StorageEngine implements IService {
return enablePartition ? time / timePartitionInterval : 0;
}
- @TestOnly
public static boolean isEnablePartition() {
return enablePartition;
}