You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/06/14 07:16:52 UTC

[kylin] branch master updated: KYLIN-4355 add ut

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new faa15ad  KYLIN-4355 add ut
faa15ad is described below

commit faa15ad8b13f6ccf9161b6bb59af84f29b9bf958
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Sat Jun 13 22:20:19 2020 +0800

    KYLIN-4355 add ut
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  7 ++
 .../kylin/rest/controller/AdminController.java     | 12 +++-
 .../kylin/rest/service/StreamingV2Service.java     | 15 ++--
 .../kylin/rest/service/StreamingV2ServiceTest.java | 84 ++++++++++++++++++++++
 .../kylin/stream/coordinator/CoordinatorTest.java  |  2 +-
 .../kylin/stream/server/StreamingServer.java       |  2 +-
 6 files changed, 113 insertions(+), 9 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dd7b0ad..9e17de6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2225,6 +2225,13 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.web.set-config-enable", FALSE));
     }
 
+    /**
+     * @see #isWebConfigEnabled
+     */
+    public String getPropertiesWhiteListForModification() {
+        return getOptional("kylin.web.properties.whitelist", "kylin.query.cache-enabled");
+    }
+
     public String getPropertiesWhiteList() {
         return getOptional("kylin.web.properties.whitelist", "kylin.web.timezone,kylin.query.cache-enabled,kylin.env,"
                 + "kylin.web.hive-limit,kylin.storage.default,"
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
index 2529c93..4d90db8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
@@ -20,6 +20,9 @@ package org.apache.kylin.rest.controller;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.kylin.common.KylinConfig;
@@ -50,6 +53,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
 @RequestMapping(value = "/admin")
 public class AdminController extends BasicController {
 
+    private Set<String> propertiesWhiteList = new HashSet<>();
+
     @Autowired
     @Qualifier("adminService")
     private AdminService adminService;
@@ -119,9 +124,12 @@ public class AdminController extends BasicController {
         adminService.cleanupStorage();
     }
 
-    @RequestMapping(value = "/config", method = { RequestMethod.PUT }, produces = { "application/json" })
+    @RequestMapping(value = "/config", method = {RequestMethod.PUT}, produces = {"application/json"})
     public void updateKylinConfig(@RequestBody UpdateConfigRequest updateConfigRequest) {
-        if (!adminService.configWritableStatus()) {
+        if (propertiesWhiteList.isEmpty()) {
+            propertiesWhiteList.addAll(Arrays.asList(KylinConfig.getInstanceFromEnv().getPropertiesWhiteListForModification().split(",")));
+        }
+        if (!adminService.configWritableStatus() && !propertiesWhiteList.contains(updateConfigRequest.getKey())) {
             throw new BadRequestException("Update configuration from API is not allowed.");
         }
         adminService.updateConfig(updateConfigRequest.getKey(), updateConfigRequest.getValue());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index 3c88b79..c43d625 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.rest.service;
 
@@ -100,6 +100,11 @@ public class StreamingV2Service extends BasicService {
         receiverAdminClient = new HttpReceiverAdminClient();
     }
 
+    StreamingV2Service(StreamMetadataStore metadataStore, ReceiverAdminClient adminClient) {
+        streamMetadataStore = metadataStore;
+        receiverAdminClient = adminClient;
+    }
+
     public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException {
         List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
         if (StringUtils.isEmpty(table)) {
@@ -228,10 +233,10 @@ public class StreamingV2Service extends BasicService {
         getCoordinatorClient().reAssignCube(cubeName, newAssignment);
     }
 
-    private void validateAssignment(CubeAssignment newAssignment) {
+    void validateAssignment(CubeAssignment newAssignment) {
         Map<Integer, List<Partition>> assignments = newAssignment.getAssignments();
         Map<Integer, Set<Partition>> assignmentSet = assignments.keySet().stream().collect(
-                Collectors.toMap(Function.identity(), HashSet::new));
+                Collectors.toMap(Function.identity(), x -> new HashSet<>(assignments.get(x))));
 
         Set<Integer> inputReplicaSetIDs = assignments.keySet();
         Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs());
diff --git a/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java b/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java
new file mode 100644
index 0000000..e3de205
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.stream.coordinator.StreamMetadataStore;
+import org.apache.kylin.stream.core.model.CubeAssignment;
+import org.apache.kylin.stream.core.source.Partition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+
+public class StreamingV2ServiceTest {
+
+    @Test
+    public void testValidateAssignment() {
+        Map<Integer, List<Partition>> assignmentMap = new HashMap<>();
+        Partition p1 = new Partition(1);
+        Partition p2 = new Partition(2);
+        Partition p3 = new Partition(3);
+        List<Integer> replicaSets = new ArrayList<>();
+        replicaSets.add(0);
+        replicaSets.add(1);
+        List<Partition> l1 = new ArrayList<>();
+        l1.add(p1);
+        l1.add(p2);
+        List<Partition> l2 = new ArrayList<>();
+        l2.add(p3);
+        assignmentMap.put(0, l1);
+        assignmentMap.put(1, l2);
+
+        CubeAssignment cubeAssignment = new CubeAssignment("test", assignmentMap);
+        StreamMetadataStore metadataStore = mock(StreamMetadataStore.class);
+        when(metadataStore.getReplicaSetIDs()).thenReturn(replicaSets);
+        StreamingV2Service streamingV2Service = new StreamingV2Service(metadataStore, null);
+        Exception exception = null;
+
+        // normal case
+        streamingV2Service.validateAssignment(cubeAssignment);
+
+        // bad case 1
+        l2.add(p2);
+        try {
+            streamingV2Service.validateAssignment(cubeAssignment);
+        } catch (IllegalArgumentException ill) {
+            exception = ill;
+            ill.printStackTrace();
+        }
+        Assert.assertNotNull("Intersection detected between : 0 with 1", exception);
+
+        // bad case 2
+        l2.clear();
+        exception = null;
+        try {
+            streamingV2Service.validateAssignment(cubeAssignment);
+        } catch (IllegalArgumentException ill) {
+            exception = ill;
+            ill.printStackTrace();
+        }
+        Assert.assertNotNull("PartitionList is empty", exception);
+    }
+}
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
index e6cdf80..f1e1644 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
@@ -207,7 +207,7 @@ public class CoordinatorTest extends LocalFileMetadataTestCase {
     }
 
     @Test
-    public void testReassignWithoutExeception() throws IOException {
+    public void testReassignWithoutException() throws IOException {
 
         ReceiverAdminClient receiverAdminClient = mockSuccessReceiverAdminClient();
         coordinator = new Coordinator(metadataStore, receiverAdminClient);
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index a948b9e..7ace4ec 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -558,7 +558,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
         if (latestRemoteSegment != null) {
             minAcceptEventTime = latestRemoteSegment.getTSRange().end.v;
         }
-        if (minAcceptEventTime > 0) {
+        if (minAcceptEventTime > 0 && minAcceptEventTime < System.currentTimeMillis()) {
             consumer.setMinAcceptEventTime(minAcceptEventTime);
         }
         StreamingCubeConsumeState consumeState = streamMetadataStore.getStreamingCubeConsumeState(cubeName);