You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/12 05:46:00 UTC

[kylin] branch master updated: add checking rule: before create new replicaset, we should check whether the node are in the other sets

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b75eab  add checking rule: before create new replicaset, we should check whether the node are in the other sets
2b75eab is described below

commit 2b75eab88bf51b94ddfaca50785555a09b6cc83e
Author: liukun4515 <li...@apache.org>
AuthorDate: Mon Jul 5 11:11:46 2021 +0800

    add checking rule: before create new replicaset, we should check whether the node are in the other sets
---
 .../org/apache/kylin/stream/coordinator/Coordinator.java   | 12 +++++++++++-
 .../coordinator/coordinate/StreamingCoordinator.java       | 14 +++++++++++++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 365a01d..d534ed2 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -706,6 +706,16 @@ public class Coordinator implements CoordinatorClient {
     }
 
     public synchronized void createReplicaSet(ReplicaSet rs) {
+        List<ReplicaSet> allReplicaset = streamMetadataStore.getReplicaSets();
+        // before creating the new set, we should check whether the nodes are in other sets.
+        for (Node receiver : rs.getNodes()) {
+            for (ReplicaSet set : allReplicaset) {
+                if (set.containPhysicalNode(receiver)) {
+                    throw new CoordinateException(String.format(Locale.ROOT,
+                            "The receiver node %s is already exists in the set %s", receiver, set));
+                }
+            }
+        }
         int replicaSetID = streamMetadataStore.createReplicaSet(rs);
         try {
             for (Node receiver : rs.getNodes()) {
@@ -737,7 +747,7 @@ public class Coordinator implements CoordinatorClient {
         List<ReplicaSet> allReplicaSet = streamMetadataStore.getReplicaSets();
         for (ReplicaSet other : allReplicaSet) {
             if (other.getReplicaSetID() != replicaSetID) {
-                if (other.getNodes().contains(receiver)) {
+                if (other.containPhysicalNode(receiver)) {
                     logger.error("error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
                             replicaSetID, other.getReplicaSetID());
                     throw new IllegalStateException("Node exists in ReplicaSet!");
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
index 1272f49..2d1000c 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator.coordinate;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -347,6 +348,17 @@ public class StreamingCoordinator implements CoordinatorClient {
     }
 
     public synchronized void createReplicaSet(ReplicaSet rs) {
+        List<ReplicaSet> allReplicaset = streamMetadataStore.getReplicaSets();
+        // before creating the new set, we should check whether the nodes are in other sets.
+        for (Node receiver : rs.getNodes()) {
+            for (ReplicaSet set : allReplicaset) {
+                if (set.containPhysicalNode(receiver)) {
+                    throw new CoordinateException(String.format(
+                        Locale.ROOT,
+                        "The receiver node %s is already exists in the set %s", receiver, set));
+                }
+            }
+        }
         int replicaSetID = streamMetadataStore.createReplicaSet(rs);
         try {
             for (Node receiver : rs.getNodes()) {
@@ -379,7 +391,7 @@ public class StreamingCoordinator implements CoordinatorClient {
         List<ReplicaSet> allReplicaSet = streamMetadataStore.getReplicaSets();
         for (ReplicaSet other : allReplicaSet) {
             if (other.getReplicaSetID() != replicaSetID) {
-                if (other.getNodes().contains(receiver)) {
+                if (other.containPhysicalNode(receiver)) {
                     logger.error("Error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
                             replicaSetID, other.getReplicaSetID());
                     throw new IllegalStateException("Node exists in ReplicaSet!");