You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/06/14 07:16:52 UTC
[kylin] branch master updated: KYLIN-4355 add ut
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new faa15ad KYLIN-4355 add ut
faa15ad is described below
commit faa15ad8b13f6ccf9161b6bb59af84f29b9bf958
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Sat Jun 13 22:20:19 2020 +0800
KYLIN-4355 add ut
---
.../org/apache/kylin/common/KylinConfigBase.java | 7 ++
.../kylin/rest/controller/AdminController.java | 12 +++-
.../kylin/rest/service/StreamingV2Service.java | 15 ++--
.../kylin/rest/service/StreamingV2ServiceTest.java | 84 ++++++++++++++++++++++
.../kylin/stream/coordinator/CoordinatorTest.java | 2 +-
.../kylin/stream/server/StreamingServer.java | 2 +-
6 files changed, 113 insertions(+), 9 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dd7b0ad..9e17de6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2225,6 +2225,13 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.web.set-config-enable", FALSE));
}
+ /**
+ * @see #isWebConfigEnabled
+ */
+ public String getPropertiesWhiteListForModification() {
+ return getOptional("kylin.web.properties.whitelist", "kylin.query.cache-enabled");
+ }
+
public String getPropertiesWhiteList() {
return getOptional("kylin.web.properties.whitelist", "kylin.web.timezone,kylin.query.cache-enabled,kylin.env,"
+ "kylin.web.hive-limit,kylin.storage.default,"
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
index 2529c93..4d90db8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
@@ -20,6 +20,9 @@ package org.apache.kylin.rest.controller;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.kylin.common.KylinConfig;
@@ -50,6 +53,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
@RequestMapping(value = "/admin")
public class AdminController extends BasicController {
+ private Set<String> propertiesWhiteList = new HashSet<>();
+
@Autowired
@Qualifier("adminService")
private AdminService adminService;
@@ -119,9 +124,12 @@ public class AdminController extends BasicController {
adminService.cleanupStorage();
}
- @RequestMapping(value = "/config", method = { RequestMethod.PUT }, produces = { "application/json" })
+ @RequestMapping(value = "/config", method = {RequestMethod.PUT}, produces = {"application/json"})
public void updateKylinConfig(@RequestBody UpdateConfigRequest updateConfigRequest) {
- if (!adminService.configWritableStatus()) {
+ if (propertiesWhiteList.isEmpty()) {
+ propertiesWhiteList.addAll(Arrays.asList(KylinConfig.getInstanceFromEnv().getPropertiesWhiteListForModification().split(",")));
+ }
+ if (!adminService.configWritableStatus() && !propertiesWhiteList.contains(updateConfigRequest.getKey())) {
throw new BadRequestException("Update configuration from API is not allowed.");
}
adminService.updateConfig(updateConfigRequest.getKey(), updateConfigRequest.getValue());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index 3c88b79..c43d625 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.rest.service;
@@ -100,6 +100,11 @@ public class StreamingV2Service extends BasicService {
receiverAdminClient = new HttpReceiverAdminClient();
}
+ StreamingV2Service(StreamMetadataStore metadataStore, ReceiverAdminClient adminClient) {
+ streamMetadataStore = metadataStore;
+ receiverAdminClient = adminClient;
+ }
+
public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException {
List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
if (StringUtils.isEmpty(table)) {
@@ -228,10 +233,10 @@ public class StreamingV2Service extends BasicService {
getCoordinatorClient().reAssignCube(cubeName, newAssignment);
}
- private void validateAssignment(CubeAssignment newAssignment) {
+ void validateAssignment(CubeAssignment newAssignment) {
Map<Integer, List<Partition>> assignments = newAssignment.getAssignments();
Map<Integer, Set<Partition>> assignmentSet = assignments.keySet().stream().collect(
- Collectors.toMap(Function.identity(), HashSet::new));
+ Collectors.toMap(Function.identity(), x -> new HashSet<>(assignments.get(x))));
Set<Integer> inputReplicaSetIDs = assignments.keySet();
Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs());
diff --git a/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java b/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java
new file mode 100644
index 0000000..e3de205
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/service/StreamingV2ServiceTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.stream.coordinator.StreamMetadataStore;
+import org.apache.kylin.stream.core.model.CubeAssignment;
+import org.apache.kylin.stream.core.source.Partition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+
+public class StreamingV2ServiceTest {
+
+ @Test
+ public void testValidateAssignment() {
+ Map<Integer, List<Partition>> assignmentMap = new HashMap<>();
+ Partition p1 = new Partition(1);
+ Partition p2 = new Partition(2);
+ Partition p3 = new Partition(3);
+ List<Integer> replicaSets = new ArrayList<>();
+ replicaSets.add(0);
+ replicaSets.add(1);
+ List<Partition> l1 = new ArrayList<>();
+ l1.add(p1);
+ l1.add(p2);
+ List<Partition> l2 = new ArrayList<>();
+ l2.add(p3);
+ assignmentMap.put(0, l1);
+ assignmentMap.put(1, l2);
+
+ CubeAssignment cubeAssignment = new CubeAssignment("test", assignmentMap);
+ StreamMetadataStore metadataStore = mock(StreamMetadataStore.class);
+ when(metadataStore.getReplicaSetIDs()).thenReturn(replicaSets);
+ StreamingV2Service streamingV2Service = new StreamingV2Service(metadataStore, null);
+ Exception exception = null;
+
+ // normal case
+ streamingV2Service.validateAssignment(cubeAssignment);
+
+ // bad case 1
+ l2.add(p2);
+ try {
+ streamingV2Service.validateAssignment(cubeAssignment);
+ } catch (IllegalArgumentException ill) {
+ exception = ill;
+ ill.printStackTrace();
+ }
+ Assert.assertNotNull("Intersection detected between : 0 with 1", exception);
+
+ // bad case 2
+ l2.clear();
+ exception = null;
+ try {
+ streamingV2Service.validateAssignment(cubeAssignment);
+ } catch (IllegalArgumentException ill) {
+ exception = ill;
+ ill.printStackTrace();
+ }
+ Assert.assertNotNull("PartitionList is empty", exception);
+ }
+}
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
index e6cdf80..f1e1644 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
@@ -207,7 +207,7 @@ public class CoordinatorTest extends LocalFileMetadataTestCase {
}
@Test
- public void testReassignWithoutExeception() throws IOException {
+ public void testReassignWithoutException() throws IOException {
ReceiverAdminClient receiverAdminClient = mockSuccessReceiverAdminClient();
coordinator = new Coordinator(metadataStore, receiverAdminClient);
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index a948b9e..7ace4ec 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -558,7 +558,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
if (latestRemoteSegment != null) {
minAcceptEventTime = latestRemoteSegment.getTSRange().end.v;
}
- if (minAcceptEventTime > 0) {
+ if (minAcceptEventTime > 0 && minAcceptEventTime < System.currentTimeMillis()) {
consumer.setMinAcceptEventTime(minAcceptEventTime);
}
StreamingCubeConsumeState consumeState = streamMetadataStore.getStreamingCubeConsumeState(cubeName);