You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/06 08:16:11 UTC

[iotdb] branch cluster_scalability updated: enable auto create schema in cluster node and add sync meta log for new node

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new 9446e77  enable auto create schema in cluster node and add sync meta log for new node
9446e77 is described below

commit 9446e77a248d90f6c469744a93cfd03fd8d671c4
Author: lta <li...@163.com>
AuthorDate: Sat Mar 6 16:15:46 2021 +0800

    enable auto create schema in cluster node and add sync meta log for new node
---
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  1 -
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   |  5 +++-
 .../cluster/server/PullSnapshotHintService.java    | 35 ++++++++++++----------
 3 files changed, 24 insertions(+), 17 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 48f8813..391a7cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -86,7 +86,6 @@ public class ClusterMain {
     }
 
     IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
-    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
     logger.info("Running mode {}", mode);
     if (MODE_START.equals(mode)) {
       try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index a731d32..c4b3f98 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -212,7 +212,10 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
 
     private void installSnapshot(Map<Integer, FileSnapshot> snapshotMap)
         throws SnapshotInstallationException {
-      // In data migration, meta group member does not need to synchronize the leader,
+      // for new node, it should sync meta log
+      dataGroupMember.getMetaGroupMember().waitUtil(dataGroupMember.getMetaGroupMember().getPartitionTable().getLastMetaLogIndex() - 1);
+
+      // In data migration, meta group member other than new node does not need to synchronize the leader,
       // because data migration must be carried out after meta group applied add/remove node log.
       for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet()) {
         Integer slot = integerSnapshotEntry.getKey();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index 63e2571..2ac7d28 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -83,22 +83,27 @@ public class PullSnapshotHintService {
       PullSnapshotHint hint = iterator.next();
       for (Iterator<Node> iter = hint.receivers.iterator(); iter.hasNext(); ) {
         Node receiver = iter.next();
-        try {
-          if (logger.isDebugEnabled()) {
-            logger.debug(
-                "{}: start to send hint to target group {}, receiver {}, slot is {} and other {}",
-                member.getName(), hint.partitionGroup, receiver, hint.slots.get(0),
-                hint.slots.size() - 1);
+        // If the receiver is the removed node, ignore the hint
+        if (!member.getMetaGroupMember().getPartitionTable().getAllNodes().contains(receiver)) {
+          iter.remove();
+        } else {
+          try {
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  "{}: start to send hint to target group {}, receiver {}, slot is {} and other {}",
+                  member.getName(), hint.partitionGroup, receiver, hint.slots.get(0),
+                  hint.slots.size() - 1);
+            }
+            boolean result = sendHint(receiver, hint);
+            if (result) {
+              iter.remove();
+            }
+          } catch (TException e) {
+            logger.warn("Cannot send pull snapshot hint to {}", receiver);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Sending hint to {} interrupted", receiver);
           }
-          boolean result = sendHint(receiver, hint);
-          if (result) {
-            iter.remove();
-          }
-        } catch (TException e) {
-          logger.warn("Cannot send pull snapshot hint to {}", receiver);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          logger.warn("Sending hint to {} interrupted", receiver);
         }
       }
       // all nodes in remote group know the hint, the hint can be removed