You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/06/04 03:13:15 UTC

[kylin] 01/02: KYLIN-4355 Add validation for cube assignment

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b497de8a583f61faa991195b7aee263b3c4de900
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Tue Jun 2 10:53:27 2020 +0800

    KYLIN-4355 Add validation for cube assignment
---
 .../kylin/rest/service/StreamingV2Service.java     | 24 ++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index 0141e01..3c88b79 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.service;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,6 +31,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -75,7 +78,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.shaded.com.google.common.collect.Sets;
 
 /**
- * StreamingCoordinatorService will try to forward request to corrdinator leader by HttpClient.
+ * StreamingCoordinatorService will try to forward request to coordinator leader.
  */
 @Component("streamingServiceV2")
 public class StreamingV2Service extends BasicService {
@@ -227,11 +230,28 @@ public class StreamingV2Service extends BasicService {
 
     private void validateAssignment(CubeAssignment newAssignment) {
         Map<Integer, List<Partition>> assignments = newAssignment.getAssignments();
+        Map<Integer, Set<Partition>> assignmentSet = assignments.keySet().stream().collect(
+                Collectors.toMap(Function.identity(), HashSet::new));
+
         Set<Integer> inputReplicaSetIDs = assignments.keySet();
         Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs());
         for (Integer inputReplicaSetID : inputReplicaSetIDs) {
             if (!allReplicaSetIDs.contains(inputReplicaSetID)) {
-                throw new IllegalArgumentException("the replica set id:" + inputReplicaSetID + " does not exist");
+                throw new IllegalArgumentException("The replica set id:" + inputReplicaSetID + " does not exist");
+            }
+
+            Set<Partition> partitionSet = assignmentSet.get(inputReplicaSetID);
+            if (partitionSet.isEmpty()) {
+                throw new IllegalArgumentException("PartitionList is empty :" + inputReplicaSetID);
+            }
+            for (Map.Entry<Integer, Set<Partition>> entry : assignmentSet.entrySet()) {
+                if (!entry.getKey().equals(inputReplicaSetID)) {
+                    Set<Partition> anotherPartitionSet = entry.getValue();
+                    int intersection = Sets.intersection(anotherPartitionSet, partitionSet).size();
+                    if (intersection > 0) {
+                        throw new IllegalArgumentException("Intersection detected between : " + inputReplicaSetID + " with " + entry.getKey());
+                    }
+                }
             }
         }
     }