You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/05/07 06:52:58 UTC

[iotdb] branch cluster_scalability updated: add a feature of merging schema result for those slots which are in state of PULLING due to add/remove node

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new 74cfc17  add a feature of merging schema result for those slots which are in state of PULLING due to add/remove node
74cfc17 is described below

commit 74cfc17633e1365a1510eccdb197d391195d7614
Author: lta <li...@163.com>
AuthorDate: Fri May 7 14:52:24 2021 +0800

    add a feature of merging schema result for those slots which are in state of PULLING due to add/remove node
---
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  2 +-
 .../iotdb/cluster/partition/slot/SlotManager.java  |  5 +++
 .../iotdb/cluster/query/LocalQueryExecutor.java    | 52 +++++++++++++++++++---
 .../cluster/server/service/DataSyncService.java    |  2 +-
 .../apache/iotdb/cluster/utils/ClusterUtils.java   | 27 +++++++++++
 5 files changed, 81 insertions(+), 7 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 06fdb13..42c83cc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -124,7 +124,7 @@ public class MetaPuller {
    * @param prefixPaths
    * @param results
    */
-  private void pullMeasurementSchemas(
+  public void pullMeasurementSchemas(
       PartitionGroup partitionGroup,
       List<PartialPath> prefixPaths,
       List<MeasurementSchema> results) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
index 175eefa..8da9298 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
@@ -137,6 +137,11 @@ public class SlotManager {
         || slotDescriptor.slotStatus == SlotStatus.PULLING_WRITABLE;
   }
 
+  public boolean checkSlotInMetaMigrationStatus(int slotId) {
+    SlotDescriptor slotDescriptor = idSlotMap.get(slotId);
+    return slotDescriptor.slotStatus == SlotStatus.PULLING;
+  }
+
   /**
    * @param slotId
    * @return the SlotStatus of a slot
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index c61cc66..5e0e38c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.cluster.query;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.ReaderNotFoundException;
 import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.query.filter.SlotTsFileFilter;
 import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
@@ -35,9 +37,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -82,6 +86,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -433,8 +438,8 @@ public class LocalQueryExecutor {
    *
    * @param request
    */
-  public PullSchemaResp queryMeasurementSchema(PullSchemaRequest request)
-      throws CheckConsistencyException, IllegalPathException {
+  public PullSchemaResp queryMeasurementSchema(PullSchemaRequest request) // pullMeasurementSchemas
+      throws CheckConsistencyException, MetadataException {
     // try to synchronize with the leader first in case that some schema logs are accepted but
     // not committed yet
     dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -443,9 +448,8 @@ public class LocalQueryExecutor {
     // the measurements in them are the full paths.
     List<String> prefixPaths = request.getPrefixPaths();
     List<MeasurementSchema> measurementSchemas = new ArrayList<>();
-    for (String prefixPath : prefixPaths) {
-      getCMManager().collectSeries(new PartialPath(prefixPath), measurementSchemas);
-    }
+
+    collectSeries(prefixPaths, measurementSchemas);
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Collected {} schemas for {} and other {} paths",
@@ -471,6 +475,44 @@ public class LocalQueryExecutor {
     return resp;
   }
 
+  private void collectSeries(List<String> prefixPaths, List<MeasurementSchema> measurementSchemas)
+      throws MetadataException {
+    // Due to add/remove node, some slots may in the state of PULLING, which will not contains the
+    // corresponding schemas.
+    // In this case, we need to pull series from previous holder.
+    Map<PartitionGroup, List<PartialPath>> prePartitionGroupPathMap = new HashMap<>();
+
+    RaftNode raftNode = new RaftNode(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
+    Map<Integer, PartitionGroup> slotPreviousHolderMap =
+        ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
+            .getPreviousNodeMap()
+            .get(raftNode);
+
+    for (String prefixPath : prefixPaths) {
+      int slot =
+          ClusterUtils.getSlotByPathTimeWithSync(
+              new PartialPath(prefixPath), dataGroupMember.getMetaGroupMember());
+      if (dataGroupMember.getSlotManager().checkSlotInMetaMigrationStatus(slot)
+          && slotPreviousHolderMap.containsKey(slot)) {
+        prePartitionGroupPathMap
+            .computeIfAbsent(slotPreviousHolderMap.get(slot), s -> new ArrayList<>())
+            .add(new PartialPath(prefixPath));
+      } else {
+        getCMManager().collectSeries(new PartialPath(prefixPath), measurementSchemas);
+      }
+    }
+
+    if (prePartitionGroupPathMap.isEmpty()) {
+      return;
+    }
+    for (Map.Entry<PartitionGroup, List<PartialPath>> partitionGroupListEntry :
+        prePartitionGroupPathMap.entrySet()) {
+      PartitionGroup partitionGroup = partitionGroupListEntry.getKey();
+      List<PartialPath> paths = partitionGroupListEntry.getValue();
+      MetaPuller.getInstance().pullMeasurementSchemas(partitionGroup, paths, measurementSchemas);
+    }
+  }
+
   /**
    * Create an IReaderByTime of a path, register it in the query manager to get a reader id for it
    * and send the id back to the requester. If the reader does not have any data, an id of -1 will
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 80fa966..ce1d91c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -171,7 +171,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
     if (dataGroupMember.getCharacter() == NodeCharacter.LEADER) {
       try {
         return dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request);
-      } catch (CheckConsistencyException | IllegalPathException e) {
+      } catch (CheckConsistencyException | MetadataException e) {
         throw new TException(e);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 9333287..9e7d695 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.cluster.utils;
 
 import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
@@ -31,6 +33,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 
@@ -365,6 +368,30 @@ public class ClusterUtils {
     return partitionGroup;
   }
 
+  public static int getSlotByPathTimeWithSync(
+      PartialPath prefixPath, MetaGroupMember metaGroupMember) throws MetadataException {
+    int slot;
+    try {
+      PartialPath storageGroup = IoTDB.metaManager.getStorageGroupPath(prefixPath);
+      slot =
+          SlotPartitionTable.getSlotStrategy()
+              .calculateSlotByPartitionNum(storageGroup.getFullPath(), 0, ClusterConstant.SLOT_NUM);
+    } catch (StorageGroupNotSetException e) {
+      // the storage group is not found locally, but may be found in the leader, retry after
+      // synchronizing with the leader
+      try {
+        metaGroupMember.syncLeaderWithConsistencyCheck(true);
+      } catch (CheckConsistencyException checkConsistencyException) {
+        throw new MetadataException(checkConsistencyException.getMessage());
+      }
+      PartialPath storageGroup = IoTDB.metaManager.getStorageGroupPath(prefixPath);
+      slot =
+          SlotPartitionTable.getSlotStrategy()
+              .calculateSlotByPartitionNum(storageGroup.getFullPath(), 0, ClusterConstant.SLOT_NUM);
+    }
+    return slot;
+  }
+
   public static ByteBuffer serializeMigrationStatus(Map<PartitionGroup, Integer> migrationStatus) {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {