You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/06/04 03:13:15 UTC
[kylin] 01/02: KYLIN-4355 Add validation for cube assignment
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b497de8a583f61faa991195b7aee263b3c4de900
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Tue Jun 2 10:53:27 2020 +0800
KYLIN-4355 Add validation for cube assignment
---
.../kylin/rest/service/StreamingV2Service.java | 24 ++++++++++++++++++++--
1 file changed, 22 insertions(+), 2 deletions(-)
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index 0141e01..3c88b79 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.service;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,6 +31,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -75,7 +78,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
/**
- * StreamingCoordinatorService will try to forward request to corrdinator leader by HttpClient.
+ * StreamingCoordinatorService will try to forward request to coordinator leader.
*/
@Component("streamingServiceV2")
public class StreamingV2Service extends BasicService {
@@ -227,11 +230,28 @@ public class StreamingV2Service extends BasicService {
private void validateAssignment(CubeAssignment newAssignment) {
Map<Integer, List<Partition>> assignments = newAssignment.getAssignments();
+ Map<Integer, Set<Partition>> assignmentSet = assignments.keySet().stream().collect(
+ Collectors.toMap(Function.identity(), HashSet::new));
+
Set<Integer> inputReplicaSetIDs = assignments.keySet();
Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs());
for (Integer inputReplicaSetID : inputReplicaSetIDs) {
if (!allReplicaSetIDs.contains(inputReplicaSetID)) {
- throw new IllegalArgumentException("the replica set id:" + inputReplicaSetID + " does not exist");
+ throw new IllegalArgumentException("The replica set id:" + inputReplicaSetID + " does not exist");
+ }
+
+ Set<Partition> partitionSet = assignmentSet.get(inputReplicaSetID);
+ if (partitionSet.isEmpty()) {
+ throw new IllegalArgumentException("PartitionList is empty :" + inputReplicaSetID);
+ }
+ for (Map.Entry<Integer, Set<Partition>> entry : assignmentSet.entrySet()) {
+ if (!entry.getKey().equals(inputReplicaSetID)) {
+ Set<Partition> anotherPartitionSet = entry.getValue();
+ int intersection = Sets.intersection(anotherPartitionSet, partitionSet).size();
+ if (intersection > 0) {
+ throw new IllegalArgumentException("Intersection detected between : " + inputReplicaSetID + " with " + entry.getKey());
+ }
+ }
}
}
}