You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/05/07 06:52:58 UTC
[iotdb] branch cluster_scalability updated: add a feature of
merging schema result for those slots which are in state of PULLING due to
add/remove node
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_scalability by this push:
new 74cfc17 add a feature of merging schema result for those slots which are in state of PULLING due to add/remove node
74cfc17 is described below
commit 74cfc17633e1365a1510eccdb197d391195d7614
Author: lta <li...@163.com>
AuthorDate: Fri May 7 14:52:24 2021 +0800
add a feature of merging schema result for those slots which are in state of PULLING due to add/remove node
---
.../apache/iotdb/cluster/metadata/MetaPuller.java | 2 +-
.../iotdb/cluster/partition/slot/SlotManager.java | 5 +++
.../iotdb/cluster/query/LocalQueryExecutor.java | 52 +++++++++++++++++++---
.../cluster/server/service/DataSyncService.java | 2 +-
.../apache/iotdb/cluster/utils/ClusterUtils.java | 27 +++++++++++
5 files changed, 81 insertions(+), 7 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 06fdb13..42c83cc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -124,7 +124,7 @@ public class MetaPuller {
* @param prefixPaths
* @param results
*/
- private void pullMeasurementSchemas(
+ public void pullMeasurementSchemas(
PartitionGroup partitionGroup,
List<PartialPath> prefixPaths,
List<MeasurementSchema> results) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
index 175eefa..8da9298 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
@@ -137,6 +137,11 @@ public class SlotManager {
|| slotDescriptor.slotStatus == SlotStatus.PULLING_WRITABLE;
}
+ public boolean checkSlotInMetaMigrationStatus(int slotId) {
+ SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+ return slotDescriptor.slotStatus == SlotStatus.PULLING;
+ }
+
/**
* @param slotId
* @return the SlotStatus of a slot
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index c61cc66..5e0e38c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.ReaderNotFoundException;
import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.query.filter.SlotTsFileFilter;
import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
@@ -35,9 +37,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -82,6 +86,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -433,8 +438,8 @@ public class LocalQueryExecutor {
*
* @param request
*/
- public PullSchemaResp queryMeasurementSchema(PullSchemaRequest request)
- throws CheckConsistencyException, IllegalPathException {
+ public PullSchemaResp queryMeasurementSchema(PullSchemaRequest request) // pullMeasurementSchemas
+ throws CheckConsistencyException, MetadataException {
// try to synchronize with the leader first in case that some schema logs are accepted but
// not committed yet
dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -443,9 +448,8 @@ public class LocalQueryExecutor {
// the measurements in them are the full paths.
List<String> prefixPaths = request.getPrefixPaths();
List<MeasurementSchema> measurementSchemas = new ArrayList<>();
- for (String prefixPath : prefixPaths) {
- getCMManager().collectSeries(new PartialPath(prefixPath), measurementSchemas);
- }
+
+ collectSeries(prefixPaths, measurementSchemas);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Collected {} schemas for {} and other {} paths",
@@ -471,6 +475,44 @@ public class LocalQueryExecutor {
return resp;
}
+ private void collectSeries(List<String> prefixPaths, List<MeasurementSchema> measurementSchemas)
+ throws MetadataException {
+ // Due to add/remove node, some slots may in the state of PULLING, which will not contains the
+ // corresponding schemas.
+ // In this case, we need to pull series from previous holder.
+ Map<PartitionGroup, List<PartialPath>> prePartitionGroupPathMap = new HashMap<>();
+
+ RaftNode raftNode = new RaftNode(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
+ Map<Integer, PartitionGroup> slotPreviousHolderMap =
+ ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
+ .getPreviousNodeMap()
+ .get(raftNode);
+
+ for (String prefixPath : prefixPaths) {
+ int slot =
+ ClusterUtils.getSlotByPathTimeWithSync(
+ new PartialPath(prefixPath), dataGroupMember.getMetaGroupMember());
+ if (dataGroupMember.getSlotManager().checkSlotInMetaMigrationStatus(slot)
+ && slotPreviousHolderMap.containsKey(slot)) {
+ prePartitionGroupPathMap
+ .computeIfAbsent(slotPreviousHolderMap.get(slot), s -> new ArrayList<>())
+ .add(new PartialPath(prefixPath));
+ } else {
+ getCMManager().collectSeries(new PartialPath(prefixPath), measurementSchemas);
+ }
+ }
+
+ if (prePartitionGroupPathMap.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<PartitionGroup, List<PartialPath>> partitionGroupListEntry :
+ prePartitionGroupPathMap.entrySet()) {
+ PartitionGroup partitionGroup = partitionGroupListEntry.getKey();
+ List<PartialPath> paths = partitionGroupListEntry.getValue();
+ MetaPuller.getInstance().pullMeasurementSchemas(partitionGroup, paths, measurementSchemas);
+ }
+ }
+
/**
* Create an IReaderByTime of a path, register it in the query manager to get a reader id for it
* and send the id back to the requester. If the reader does not have any data, an id of -1 will
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 80fa966..ce1d91c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -171,7 +171,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
if (dataGroupMember.getCharacter() == NodeCharacter.LEADER) {
try {
return dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request);
- } catch (CheckConsistencyException | IllegalPathException e) {
+ } catch (CheckConsistencyException | MetadataException e) {
throw new TException(e);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 9333287..9e7d695 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -20,10 +20,12 @@
package org.apache.iotdb.cluster.utils;
import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
@@ -31,6 +33,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -365,6 +368,30 @@ public class ClusterUtils {
return partitionGroup;
}
+ public static int getSlotByPathTimeWithSync(
+ PartialPath prefixPath, MetaGroupMember metaGroupMember) throws MetadataException {
+ int slot;
+ try {
+ PartialPath storageGroup = IoTDB.metaManager.getStorageGroupPath(prefixPath);
+ slot =
+ SlotPartitionTable.getSlotStrategy()
+ .calculateSlotByPartitionNum(storageGroup.getFullPath(), 0, ClusterConstant.SLOT_NUM);
+ } catch (StorageGroupNotSetException e) {
+ // the storage group is not found locally, but may be found in the leader, retry after
+ // synchronizing with the leader
+ try {
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ } catch (CheckConsistencyException checkConsistencyException) {
+ throw new MetadataException(checkConsistencyException.getMessage());
+ }
+ PartialPath storageGroup = IoTDB.metaManager.getStorageGroupPath(prefixPath);
+ slot =
+ SlotPartitionTable.getSlotStrategy()
+ .calculateSlotByPartitionNum(storageGroup.getFullPath(), 0, ClusterConstant.SLOT_NUM);
+ }
+ return slot;
+ }
+
public static ByteBuffer serializeMigrationStatus(Map<PartitionGroup, Integer> migrationStatus) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {