You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/12 05:46:00 UTC
[kylin] branch master updated: add checking rule: before create new
replicaset, we should check whether the node are in the other sets
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 2b75eab add checking rule: before create new replicaset, we should check whether the node are in the other sets
2b75eab is described below
commit 2b75eab88bf51b94ddfaca50785555a09b6cc83e
Author: liukun4515 <li...@apache.org>
AuthorDate: Mon Jul 5 11:11:46 2021 +0800
add checking rule: before create new replicaset, we should check whether the node are in the other sets
---
.../org/apache/kylin/stream/coordinator/Coordinator.java | 12 +++++++++++-
.../coordinator/coordinate/StreamingCoordinator.java | 14 +++++++++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 365a01d..d534ed2 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -706,6 +706,16 @@ public class Coordinator implements CoordinatorClient {
}
public synchronized void createReplicaSet(ReplicaSet rs) {
+ List<ReplicaSet> allReplicaset = streamMetadataStore.getReplicaSets();
+ // before creating the new set, we should check whether the nodes are in other sets.
+ for (Node receiver : rs.getNodes()) {
+ for (ReplicaSet set : allReplicaset) {
+ if (set.containPhysicalNode(receiver)) {
+ throw new CoordinateException(String.format(Locale.ROOT,
+ "The receiver node %s is already exists in the set %s", receiver, set));
+ }
+ }
+ }
int replicaSetID = streamMetadataStore.createReplicaSet(rs);
try {
for (Node receiver : rs.getNodes()) {
@@ -737,7 +747,7 @@ public class Coordinator implements CoordinatorClient {
List<ReplicaSet> allReplicaSet = streamMetadataStore.getReplicaSets();
for (ReplicaSet other : allReplicaSet) {
if (other.getReplicaSetID() != replicaSetID) {
- if (other.getNodes().contains(receiver)) {
+ if (other.containPhysicalNode(receiver)) {
logger.error("error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
replicaSetID, other.getReplicaSetID());
throw new IllegalStateException("Node exists in ReplicaSet!");
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
index 1272f49..2d1000c 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator.coordinate;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -347,6 +348,17 @@ public class StreamingCoordinator implements CoordinatorClient {
}
public synchronized void createReplicaSet(ReplicaSet rs) {
+ List<ReplicaSet> allReplicaset = streamMetadataStore.getReplicaSets();
+ // before creating the new set, we should check whether the nodes are in other sets.
+ for (Node receiver : rs.getNodes()) {
+ for (ReplicaSet set : allReplicaset) {
+ if (set.containPhysicalNode(receiver)) {
+ throw new CoordinateException(String.format(
+ Locale.ROOT,
+ "The receiver node %s is already exists in the set %s", receiver, set));
+ }
+ }
+ }
int replicaSetID = streamMetadataStore.createReplicaSet(rs);
try {
for (Node receiver : rs.getNodes()) {
@@ -379,7 +391,7 @@ public class StreamingCoordinator implements CoordinatorClient {
List<ReplicaSet> allReplicaSet = streamMetadataStore.getReplicaSets();
for (ReplicaSet other : allReplicaSet) {
if (other.getReplicaSetID() != replicaSetID) {
- if (other.getNodes().contains(receiver)) {
+ if (other.containPhysicalNode(receiver)) {
logger.error("Error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
replicaSetID, other.getReplicaSetID());
throw new IllegalStateException("Node exists in ReplicaSet!");