You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 03:53:28 UTC
[12/15] kylin git commit: KYLIN-1311 on the way
KYLIN-1311 on the way
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4f41fd5c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4f41fd5c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4f41fd5c
Branch: refs/heads/helix-201602
Commit: 4f41fd5c80351d31e83330d070d5dbd8c11c4ea5
Parents: bbfe8ae
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jan 22 11:01:48 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Feb 6 13:33:06 2016 +0800
----------------------------------------------------------------------
build/bin/kylin.sh | 8 +-
.../test_case_data/localmeta/kylin.properties | 2 +-
server/pom.xml | 1 +
.../rest/controller/ClusterController.java | 71 ++
.../kylin/rest/controller/JobController.java | 33 -
.../rest/controller/StreamingController.java | 68 +-
.../kylin/rest/helix/HelixClusterAdmin.java | 31 +-
.../rest/helix/JobEngineTransitionHandler.java | 70 ++
.../helix/LeaderStandbyStateModelFactory.java | 125 +---
.../helix/StreamCubeBuildTransitionHandler.java | 107 +++
.../rest/request/StreamingBuildRequest.java | 29 +-
.../security/KylinAuthenticationProvider.java | 3 +-
.../kylin/rest/service/StreamingService.java | 34 +-
.../rest/controller/UserControllerTest.java | 9 -
.../kylin/rest/helix/HelixClusterAdminTest.java | 22 +-
.../kylin/rest/service/CacheServiceTest.java | 720 +++++++++----------
.../kylin/rest/service/ServiceTestBase.java | 40 +-
.../rest/service/TestBaseWithZookeeper.java | 74 ++
.../source/kafka/TimedJsonStreamParser.java | 7 +-
19 files changed, 825 insertions(+), 629 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 5b03f43..d196fe6 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -70,7 +70,7 @@ then
if [ -z "$KYLIN_REST_ADDRESS" ]
then
- kylin_rest_address=`hostname`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2`
+ kylin_rest_address=`hostname -f`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2`
echo "KYLIN_REST_ADDRESS not found, will use ${kylin_rest_address}"
else
echo "KYLIN_REST_ADDRESS is set to: $KYLIN_REST_ADDRESS"
@@ -154,12 +154,12 @@ then
exit 0
elif [ "$2" == "stop" ]
then
- if [ ! -f "${KYLIN_HOME}/$3_$4" ]
+ if [ ! -f "${KYLIN_HOME}/logs/$3_$4" ]
then
echo "streaming is not running, please check"
exit 1
fi
- pid=`cat ${KYLIN_HOME}/$3_$4`
+ pid=`cat ${KYLIN_HOME}/logs/$3_$4`
if [ "$pid" = "" ]
then
echo "streaming is not running, please check"
@@ -168,7 +168,7 @@ then
echo "stopping streaming:$pid"
kill $pid
fi
- rm ${KYLIN_HOME}/$3_$4
+ rm ${KYLIN_HOME}/logs/$3_$4
exit 0
else
echo
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 978102f..41a9895 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -6,7 +6,7 @@
kylin.owner=whoami@kylin.apache.org
# List of web servers in use, this enables one web server instance to sync up with other servers.
-#kylin.rest.servers=localhost:7070
+kylin.rest.servers=localhost:7070
# The metadata store in hbase
kylin.metadata.url=
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 86ec5a5..2359855 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -466,6 +466,7 @@
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
new file mode 100644
index 0000000..97fff36
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobTimeFilterEnum;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.service.JobService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import java.util.*;
+
+/**
+ *
+ */
+@Controller
+@RequestMapping(value = "cluster")
+public class ClusterController extends BasicController implements InitializingBean {
+ private static final Logger logger = LoggerFactory.getLogger(ClusterController.class);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+ */
+ @Override
+ public void afterPropertiesSet() throws Exception {
+
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+ final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+ clusterAdmin.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ clusterAdmin.stop();
+ }
+ }));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 77d987f..a61635d 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -61,42 +61,9 @@ public class JobController extends BasicController implements InitializingBean {
*/
@Override
public void afterPropertiesSet() throws Exception {
-
String timeZone = jobService.getConfig().getTimeZone();
TimeZone tzone = TimeZone.getTimeZone(timeZone);
TimeZone.setDefault(tzone);
-
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
- if (kylinConfig.isClusterEnabled() == true) {
- logger.info("Kylin cluster enabled, will use Helix/zookeeper to coordinate.");
- final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
- clusterAdmin.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- clusterAdmin.stop();
- }
- }));
- } else {
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- DefaultScheduler scheduler = DefaultScheduler.createInstance();
- scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
- if (!scheduler.hasStarted()) {
- logger.error("scheduler has not been started");
- System.exit(1);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }).start();
- }
-
}
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index fb806d1..209c552 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -26,11 +26,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
@@ -45,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;
-import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
@@ -93,7 +88,6 @@ public class StreamingController extends BasicController {
}
}
-
/**
*
* create Streaming Schema
@@ -105,7 +99,7 @@ public class StreamingController extends BasicController {
//Update Model
StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
- if (streamingConfig == null ||kafkaConfig == null) {
+ if (streamingConfig == null || kafkaConfig == null) {
return streamingRequest;
}
if (StringUtils.isEmpty(streamingConfig.getName())) {
@@ -124,7 +118,7 @@ public class StreamingController extends BasicController {
try {
kafkaConfig.setUuid(UUID.randomUUID().toString());
kafkaConfigService.createKafkaConfig(kafkaConfig);
- }catch (IOException e){
+ } catch (IOException e) {
try {
streamingService.dropStreamingConfig(streamingConfig);
} catch (IOException e1) {
@@ -139,7 +133,7 @@ public class StreamingController extends BasicController {
@RequestMapping(value = "", method = { RequestMethod.PUT })
@ResponseBody
- public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+ public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
@@ -156,7 +150,7 @@ public class StreamingController extends BasicController {
}
try {
kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
- }catch (AccessDeniedException accessDeniedException) {
+ } catch (AccessDeniedException accessDeniedException) {
throw new ForbiddenException("You don't have right to update this KafkaConfig.");
} catch (Exception e) {
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
@@ -203,7 +197,6 @@ public class StreamingController extends BasicController {
return desc;
}
-
private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
KafkaConfig desc = null;
try {
@@ -227,16 +220,14 @@ public class StreamingController extends BasicController {
request.setMessage(message);
}
-
-
/**
* Send a stream build request
*
- * @param cubeName Cube ID
+ * @param cubeName Cube Name
* @return
* @throws IOException
*/
- @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
+ @RequestMapping(value = "/{cubeName}/build", method = { RequestMethod.PUT })
@ResponseBody
public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
@@ -244,27 +235,54 @@ public class StreamingController extends BasicController {
List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
CubeInstance cube = cubes.get(0);
- if (streamingBuildRequest.isFillGap() == false) {
- Preconditions.checkArgument(streamingBuildRequest.getEnd() > streamingBuildRequest.getStart(), "End time should be greater than start time.");
- for (CubeSegment segment : cube.getSegments()) {
- if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
- streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
- streamingBuildRequest.setSuccessful(false);
- return streamingBuildRequest;
- }
+ if (streamingBuildRequest.getEnd() <= streamingBuildRequest.getStart()) {
+ streamingBuildRequest.setMessage("End time should be greater than start time.");streamingBuildRequest.setSuccessful(false);
+ return streamingBuildRequest;
+ }
+
+ for (CubeSegment segment : cube.getSegments()) {
+ if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+ streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
+ streamingBuildRequest.setSuccessful(false);
+ return streamingBuildRequest;
}
}
streamingBuildRequest.setStreaming(streamingConfig.getName());
- streamingService.buildStream(cubeName, streamingBuildRequest);
+ streamingService.buildStream(cube, streamingBuildRequest);
streamingBuildRequest.setMessage("Build request is submitted successfully.");
streamingBuildRequest.setSuccessful(true);
return streamingBuildRequest;
}
+ /**
+ * Send a stream fillGap request
+ *
+ * @param cubeName Cube Name
+ * @return
+ * @throws IOException
+ */
+ @RequestMapping(value = "/{cubeName}/fillgap", method = { RequestMethod.PUT })
+ @ResponseBody
+ public StreamingBuildRequest fillGap(@PathVariable String cubeName) {
+ StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+ Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
+ List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
+ Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
+ CubeInstance cube = cubes.get(0);
+
+ StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest();
+ streamingBuildRequest.setStreaming(streamingConfig.getName());
+ streamingService.fillGap(cube);
+ streamingBuildRequest.setMessage("FillGap request is submitted successfully.");
+ streamingBuildRequest.setSuccessful(true);
+ return streamingBuildRequest;
+
+ }
+
public void setStreamingService(StreamingService streamingService) {
- this.streamingService= streamingService;
+ this.streamingService = streamingService;
}
public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 9850e24..0758ef1 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -33,8 +33,10 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,14 +128,13 @@ public class HelixClusterAdmin {
}
- public void addStreamingJob(String streamingName, long start, long end) {
- String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
- if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
- admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
- } else {
- logger.warn("Resource '" + resourceName + "' already exists in cluster, skip adding.");
+ public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) {
+ String resourceName = streamingBuildRequest.toResourceName();
+ if (admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+ logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add.");
+ admin.dropResource(clusterName, resourceName);
}
-
+ admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
}
@@ -150,7 +151,7 @@ public class HelixClusterAdmin {
*/
protected void startInstance(String instanceName) throws Exception {
participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
- participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory());
+ participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory(this.kylinConfig));
participantManager.connect();
participantManager.addLiveInstanceChangeListener(new KylinClusterLiveInstanceChangeListener());
@@ -179,10 +180,12 @@ public class HelixClusterAdmin {
public void stop() {
if (participantManager != null) {
participantManager.disconnect();
+ participantManager = null;
}
if (controllerManager != null) {
controllerManager.disconnect();
+ controllerManager = null;
}
}
@@ -269,11 +272,13 @@ public class HelixClusterAdmin {
int indexOfUnderscore = instanceName.lastIndexOf("_");
instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1));
}
- String restServersInCluster = StringUtil.join(instanceRestAddresses, ",");
- kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
- System.setProperty("kylin.rest.servers", restServersInCluster);
- logger.info("kylin.rest.servers update to " + restServersInCluster);
- Broadcaster.clearCache();
+ if (instanceRestAddresses.size() > 0) {
+ String restServersInCluster = StringUtil.join(instanceRestAddresses, ",");
+ kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
+ System.setProperty("kylin.rest.servers", restServersInCluster);
+ logger.info("kylin.rest.servers update to " + restServersInCluster);
+ Broadcaster.clearCache();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
new file mode 100644
index 0000000..3ef04ee
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ */
+public class JobEngineTransitionHandler extends TransitionHandler {
+ private static final Logger logger = LoggerFactory.getLogger(JobEngineTransitionHandler.class);
+ private final KylinConfig kylinConfig;
+
+ private static ConcurrentMap<KylinConfig, JobEngineTransitionHandler> instanceMaps = Maps.newConcurrentMap();
+
+ private JobEngineTransitionHandler(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
+ }
+
+ public static JobEngineTransitionHandler getInstance(KylinConfig kylinConfig) {
+ Preconditions.checkNotNull(kylinConfig);
+ instanceMaps.putIfAbsent(kylinConfig, new JobEngineTransitionHandler(kylinConfig));
+ return instanceMaps.get(kylinConfig);
+ }
+
+ @Transition(to = "LEADER", from = "STANDBY")
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
+ try {
+ DefaultScheduler scheduler = DefaultScheduler.createInstance();
+ scheduler.init(new JobEngineConfig(this.kylinConfig), new MockJobLock());
+ while (!scheduler.hasStarted()) {
+ logger.error("scheduler has not been started");
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ logger.error("error start DefaultScheduler", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Transition(to = "STANDBY", from = "LEADER")
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
+ DefaultScheduler.destroyInstance();
+
+ }
+
+ @Transition(to = "STANDBY", from = "OFFLINE")
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
+
+ }
+
+ @Transition(to = "OFFLINE", from = "STANDBY")
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+ logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index 8614e8c..940c9c2 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -1,146 +1,35 @@
package org.apache.kylin.rest.helix;
-import com.google.common.base.Preconditions;
-import org.apache.helix.NotificationContext;
import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.Transition;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigBase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.lock.MockJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
/**
*/
public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
- private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class);
+ private final KylinConfig kylinConfig;
+
+ public LeaderStandbyStateModelFactory(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
+ }
@Override
public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
- return JobEngineStateModel.INSTANCE;
+ return JobEngineTransitionHandler.getInstance(kylinConfig);
}
if (partitionId.getResourceId().stringify().startsWith(RESOURCE_STREAME_CUBE_PREFIX)) {
- return StreamCubeStateModel.INSTANCE;
+ return StreamCubeBuildTransitionHandler.getInstance(kylinConfig);
}
return null;
}
- public static class JobEngineStateModel extends TransitionHandler {
-
- public static JobEngineStateModel INSTANCE = new JobEngineStateModel();
-
- @Transition(to = "LEADER", from = "STANDBY")
- public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
- logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
- try {
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- DefaultScheduler scheduler = DefaultScheduler.createInstance();
- scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
- while (!scheduler.hasStarted()) {
- logger.error("scheduler has not been started");
- Thread.sleep(1000);
- }
- } catch (Exception e) {
- logger.error("error start DefaultScheduler", e);
- throw new RuntimeException(e);
- }
- }
-
- @Transition(to = "STANDBY", from = "LEADER")
- public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
- logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
- DefaultScheduler.destroyInstance();
-
- }
-
- @Transition(to = "STANDBY", from = "OFFLINE")
- public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
- logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
-
- }
-
- @Transition(to = "OFFLINE", from = "STANDBY")
- public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
- logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
-
- }
- }
-
- public static class StreamCubeStateModel extends TransitionHandler {
-
- public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel();
-
- @Transition(to = "LEADER", from = "STANDBY")
- public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
- String resourceName = message.getResourceId().stringify();
- Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
- long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
- String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
- long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
- String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
-
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
- final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
- final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- for (CubeSegment segment : cube.getSegments()) {
- if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) {
- logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
- return;
- }
- }
-
- KylinConfigBase.getKylinHome();
- String segmentId = start + "_" + end;
- String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;
- logger.info("Executing: " + cmd);
- try {
- String line;
- Process p = Runtime.getRuntime().exec(cmd);
- BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
- while ((line = input.readLine()) != null) {
- logger.info(line);
- }
- input.close();
- } catch (IOException err) {
- logger.error("Error happens during build streaming '" + resourceName + "'", err);
- throw new RuntimeException(err);
- }
-
- }
-
- @Transition(to = "STANDBY", from = "LEADER")
- public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-
- }
-
- @Transition(to = "STANDBY", from = "OFFLINE")
- public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
-
- }
-
- @Transition(to = "OFFLINE", from = "STANDBY")
- public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
-
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
new file mode 100644
index 0000000..44d8302
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
@@ -0,0 +1,107 @@
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ */
+public class StreamCubeBuildTransitionHandler extends TransitionHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamCubeBuildTransitionHandler.class);
+
+ private static ConcurrentMap<KylinConfig, StreamCubeBuildTransitionHandler> instanceMaps = Maps.newConcurrentMap();
+ private final KylinConfig kylinConfig;
+
+ private StreamCubeBuildTransitionHandler(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
+ }
+
+ public static StreamCubeBuildTransitionHandler getInstance(KylinConfig kylinConfig) {
+ Preconditions.checkNotNull(kylinConfig);
+ instanceMaps.putIfAbsent(kylinConfig, new StreamCubeBuildTransitionHandler(kylinConfig));
+ return instanceMaps.get(kylinConfig);
+ }
+
+ @Transition(to = "LEADER", from = "STANDBY")
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ String resourceName = message.getResourceId().stringify();
+ StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+
+ final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()).getCubeName();
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ for (CubeSegment segment : cube.getSegments()) {
+ if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+ logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
+ return;
+ }
+ }
+
+ KylinConfigBase.getKylinHome();
+ String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
+ String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming();
+ logger.info("Executing: " + cmd);
+ try {
+ String line;
+ Process p = Runtime.getRuntime().exec(cmd);
+ BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ while ((line = input.readLine()) != null) {
+ logger.info(line);
+ }
+ input.close();
+ } catch (IOException err) {
+ logger.error("Error happens during build streaming '" + resourceName + "'", err);
+ throw new RuntimeException(err);
+ }
+
+ }
+
+ @Transition(to = "STANDBY", from = "LEADER")
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ String resourceName = message.getResourceId().stringify();
+ StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+ KylinConfigBase.getKylinHome();
+ String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
+ String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId;
+ logger.info("Executing: " + cmd);
+ try {
+ String line;
+ Process p = Runtime.getRuntime().exec(cmd);
+ BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ while ((line = input.readLine()) != null) {
+ logger.info(line);
+ }
+ input.close();
+ } catch (IOException err) {
+ logger.error("Error happens during build streaming '" + resourceName + "'", err);
+ throw new RuntimeException(err);
+ }
+ }
+
+ @Transition(to = "STANDBY", from = "OFFLINE")
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+
+ }
+
+ @Transition(to = "OFFLINE", from = "STANDBY")
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
index e06a06c..dcf91fd 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
@@ -18,15 +18,28 @@
package org.apache.kylin.rest.request;
+import com.google.common.base.Preconditions;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+
+import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
+
public class StreamingBuildRequest {
private String streaming;
private long start;
private long end;
- private boolean fillGap;
private String message;
private boolean successful;
+ public StreamingBuildRequest() {
+ }
+
+ public StreamingBuildRequest(String streaming, long start, long end) {
+ this.streaming = streaming;
+ this.start = start;
+ this.end = end;
+ }
+
public String getStreaming() {
return streaming;
}
@@ -67,11 +80,17 @@ public class StreamingBuildRequest {
this.end = end;
}
- public boolean isFillGap() {
- return fillGap;
+ public String toResourceName() {
+ return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming + "_" + start + "_" + end;
}
- public void setFillGap(boolean fillGap) {
- this.fillGap = fillGap;
+ public static StreamingBuildRequest fromResourceName(String resourceName) {
+ Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
+ long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
+ String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
+ long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
+ String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
+
+ return new StreamingBuildRequest(streamingConfig, start, end);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
index 1f147ef..b8dcd43 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
@@ -72,7 +72,8 @@ public class KylinAuthenticationProvider implements AuthenticationProvider {
}
logger.debug("Authenticated user " + authed.toString());
-
+
+ SecurityContextHolder.getContext().setAuthentication(authed);
UserDetails user;
if (authed.getDetails() == null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index da20949..7c2cc48 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -21,7 +21,6 @@ package org.apache.kylin.rest.service;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
@@ -54,8 +53,8 @@ public class StreamingService extends BasicService {
if (null == cubeInstance) {
streamingConfigs = getStreamingManager().listAllStreaming();
} else {
- for(StreamingConfig config : getStreamingManager().listAllStreaming()){
- if(cubeInstance.getName().equals(config.getCubeName())){
+ for (StreamingConfig config : getStreamingManager().listAllStreaming()) {
+ if (cubeInstance.getName().equals(config.getCubeName())) {
streamingConfigs.add(config);
}
}
@@ -84,34 +83,35 @@ public class StreamingService extends BasicService {
if (getStreamingManager().getStreamingConfig(config.getName()) != null) {
throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
}
- StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config);
+ StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config);
return streamingConfig;
}
-// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+ // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
return getStreamingManager().updateStreamingConfig(config);
}
-// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+ // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
public void dropStreamingConfig(StreamingConfig config) throws IOException {
getStreamingManager().removeStreamingConfig(config);
}
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
+ public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) {
+ HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
+ clusterAdmin.addStreamingJob(streamingBuildRequest);
+ }
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
- public void buildStream(String cube, StreamingBuildRequest streamingBuildRequest) {
+ public void fillGap(CubeInstance cube) {
HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
- if (streamingBuildRequest.isFillGap()) {
- final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingBuildRequest.getStreaming());
- final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
- logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
- for (Pair<Long, Long> gap : gaps) {
- clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), gap.getFirst(), gap.getSecond());
- }
- } else {
- clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), streamingBuildRequest.getStart(), streamingBuildRequest.getEnd());
+ final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfigByCube(cube.getName());
+ final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+ logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
+ for (Pair<Long, Long> gap : gaps) {
+ StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(streamingConfig.getName(), gap.getFirst(), gap.getSecond());
+ clusterAdmin.addStreamingJob(streamingBuildRequest);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
index ab77a9a..fe0e67a 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
@@ -41,15 +41,6 @@ public class UserControllerTest extends ServiceTestBase {
private UserController userController;
- @BeforeClass
- public static void setupResource() {
- staticCreateTestMetadata();
- List<GrantedAuthority> authorities = new ArrayList<GrantedAuthority>();
- User user = new User("ADMIN", "ADMIN", authorities);
- Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN");
- SecurityContextHolder.getContext().setAuthentication(authentication);
- }
-
@Before
public void setup() throws Exception {
super.setup();
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
index 594e76b5..1c8b779 100644
--- a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.rest.service.TestBaseWithZookeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -39,10 +40,7 @@ import static org.junit.Assert.assertTrue;
/**
*/
-public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
-
- String zkAddress = "localhost:2199";
- ZkServer server;
+public class HelixClusterAdminTest extends TestBaseWithZookeeper {
HelixClusterAdmin clusterAdmin1;
HelixClusterAdmin clusterAdmin2;
@@ -52,21 +50,8 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
@Before
public void setup() throws Exception {
- createTestMetadata();
- // start zookeeper on localhost
- final File tmpDir = File.createTempFile("HelixClusterAdminTest", null);
- FileUtil.fullyDelete(tmpDir);
- tmpDir.mkdirs();
- server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() {
- @Override
- public void createDefaultNameSpace(ZkClient zkClient) {
- }
- }, 2199);
- server.start();
-
kylinConfig = this.getTestConfig();
kylinConfig.setRestAddress("localhost:7070");
- kylinConfig.setZookeeperAddress(zkAddress);
kylinConfig.setClusterName(CLUSTER_NAME);
final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
@@ -105,7 +90,7 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
// 3. shutdown the first instance
clusterAdmin1.stop();
- clusterAdmin1 = null;
+// clusterAdmin1 = null;
Thread.sleep(1000);
assertTrue(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
assertEquals(1, kylinConfig.getRestServers().length);
@@ -133,7 +118,6 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
clusterAdmin2.stop();
}
- server.shutdown();
cleanupTestMetadata();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 763bebe..8193884 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -1,366 +1,354 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.service;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class CacheServiceTest extends LocalFileMetadataTestCase {
-
- private static Server server;
-
- private static String ZK_ADDRESS = "localhost:2199";
-
- private static KylinConfig configA;
- private static KylinConfig configB;
-
- private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class);
-
- private static AtomicLong counter = new AtomicLong();
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- staticCreateTestMetadata();
- configA = KylinConfig.getInstanceFromEnv();
- configA.setProperty("kylin.rest.servers", "localhost:7070");
- configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
- configB.setProperty("kylin.rest.servers", "localhost:7070");
- configB.setMetadataUrl("../examples/test_metadata");
-
- server = new Server(7070);
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath("/");
- server.setHandler(context);
-
- final CacheService serviceA = new CacheService() {
- @Override
- public KylinConfig getConfig() {
- return configA;
- }
- };
- final CacheService serviceB = new CacheService() {
- @Override
- public KylinConfig getConfig() {
- return configB;
- }
- };
-
- final CubeService cubeServiceA = new CubeService() {
- @Override
- public KylinConfig getConfig() {
- return configA;
- }
- };
- final CubeService cubeServiceB = new CubeService() {
- @Override
- public KylinConfig getConfig() {
- return configB;
- }
- };
-
- serviceA.setCubeService(cubeServiceA);
- serviceA.initCubeChangeListener();
- serviceB.setCubeService(cubeServiceB);
- serviceB.initCubeChangeListener();
-
- context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
- @Override
- public void handle(String type, String name, String event) {
-
- Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
- Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
- final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;
- logger.info(log);
- try {
- switch (wipeEvent) {
- case CREATE:
- case UPDATE:
- serviceA.rebuildCache(wipeType, name);
- serviceB.rebuildCache(wipeType, name);
- break;
- case DROP:
- serviceA.removeCache(wipeType, name);
- serviceB.removeCache(wipeType, name);
- break;
- default:
- throw new RuntimeException("invalid type:" + wipeEvent);
- }
- } finally {
- counter.incrementAndGet();
- }
- }
- })), "/");
-
- server.start();
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- server.stop();
- cleanAfterClass();
- }
-
- @Before
- public void setUp() throws Exception {
- counter.set(0L);
- createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- private void waitForCounterAndClear(long count) {
- int retryTimes = 0;
- while ((!counter.compareAndSet(count, 0L))) {
- if (++retryTimes > 30) {
- throw new RuntimeException("timeout");
- }
- try {
- Thread.sleep(100L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- private static CubeManager getCubeManager(KylinConfig config) throws Exception {
- return CubeManager.getInstance(config);
- }
-
- private static ProjectManager getProjectManager(KylinConfig config) throws Exception {
- return ProjectManager.getInstance(config);
- }
-
- private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception {
- return CubeDescManager.getInstance(config);
- }
-
- private static MetadataManager getMetadataManager(KylinConfig config) throws Exception {
- return MetadataManager.getInstance(config);
- }
-
- @Test
- public void testBasic() throws Exception {
- assertTrue(!configA.equals(configB));
-
- assertNotNull(getCubeManager(configA));
- assertNotNull(getCubeManager(configB));
- assertNotNull(getCubeDescManager(configA));
- assertNotNull(getCubeDescManager(configB));
- assertNotNull(getProjectManager(configB));
- assertNotNull(getProjectManager(configB));
- assertNotNull(getMetadataManager(configB));
- assertNotNull(getMetadataManager(configB));
-
- assertTrue(!getCubeManager(configA).equals(getCubeManager(configB)));
- assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB)));
- assertTrue(!getProjectManager(configA).equals(getProjectManager(configB)));
- assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB)));
-
- assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size());
- }
-
- @Test
- public void testCubeCRUD() throws Exception {
- final Broadcaster broadcaster = Broadcaster.getInstance(configA);
- broadcaster.getCounterAndClear();
-
- getStore().deleteResource("/cube/a_whole_new_cube.json");
-
- //create cube
-
- final String cubeName = "a_whole_new_cube";
- final CubeManager cubeManager = getCubeManager(configA);
- final CubeManager cubeManagerB = getCubeManager(configB);
- final ProjectManager projectManager = getProjectManager(configA);
- final ProjectManager projectManagerB = getProjectManager(configB);
- final CubeDescManager cubeDescManager = getCubeDescManager(configA);
- final CubeDescManager cubeDescManagerB = getCubeDescManager(configB);
- final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc");
-
- assertTrue(cubeManager.getCube(cubeName) == null);
- assertTrue(cubeManagerB.getCube(cubeName) == null);
- assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
- assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
- cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null);
- //one for cube update, one for project update
- assertEquals(2, broadcaster.getCounterAndClear());
- waitForCounterAndClear(2);
-
- assertNotNull(cubeManager.getCube(cubeName));
- assertNotNull(cubeManagerB.getCube(cubeName));
- assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
- assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-
- //update cube
- CubeInstance cube = cubeManager.getCube(cubeName);
- assertEquals(0, cube.getSegments().size());
- assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
- CubeSegment segment = new CubeSegment();
- segment.setName("test_segment");
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToAddSegs(segment);
- cube = cubeManager.updateCube(cubeBuilder);
- //one for cube update
- assertEquals(1, broadcaster.getCounterAndClear());
- waitForCounterAndClear(1);
- assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());
- assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName());
-
- //delete cube
- cubeManager.dropCube(cubeName, false);
- //one for cube update, one for project update
- assertEquals(2, broadcaster.getCounterAndClear());
- waitForCounterAndClear(2);
-
- assertTrue(cubeManager.getCube(cubeName) == null);
- assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
- assertTrue(cubeManagerB.getCube(cubeName) == null);
- assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-
- final String cubeDescName = "test_cube_desc";
- cubeDesc.setName(cubeDescName);
- cubeDesc.setLastModified(0);
- assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
- assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
- cubeDescManager.createCubeDesc(cubeDesc);
- //one for add cube desc
- assertEquals(1, broadcaster.getCounterAndClear());
- waitForCounterAndClear(1);
- assertNotNull(cubeDescManager.getCubeDesc(cubeDescName));
- assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName));
-
- cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email"));
- cubeDescManager.updateCubeDesc(cubeDesc);
- assertEquals(1, broadcaster.getCounterAndClear());
- waitForCounterAndClear(1);
- assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList());
-
- cubeDescManager.removeCubeDesc(cubeDesc);
- //one for add cube desc
- assertEquals(1, broadcaster.getCounterAndClear());
- waitForCounterAndClear(1);
- assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
- assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
-
- getStore().deleteResource("/cube/a_whole_new_cube.json");
- }
-
- private TableDesc createTestTableDesc() {
- TableDesc tableDesc = new TableDesc();
- tableDesc.setDatabase("TEST_DB");
- tableDesc.setName("TEST_TABLE");
- tableDesc.setUuid(UUID.randomUUID().toString());
- tableDesc.setLastModified(0);
- return tableDesc;
- }
-
- @Test
- public void testMetaCRUD() throws Exception {
- final MetadataManager metadataManager = MetadataManager.getInstance(configA);
- final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
- final Broadcaster broadcaster = Broadcaster.getInstance(configA);
- broadcaster.getCounterAndClear();
-
- TableDesc tableDesc = createTestTableDesc();
- assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null);
- assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null);
- metadataManager.saveSourceTable(tableDesc);
- //only one for table insert
- assertEquals(1, broadcaster.getCounterAndClear());
- waitForCounterAndClear(1);
- assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity()));
- assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity()));
-
- final String dataModelName = "test_data_model";
- DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc");
- dataModelDesc.setName(dataModelName);
- dataModelDesc.setLastModified(0);
- assertTrue(metadataManager.getDataModelDesc(dataModelName) == null);
- assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null);
-
- dataModelDesc.setName(dataModelName);
- metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN");
- //one for data model creation, one for project meta update
- assertEquals(2, broadcaster.getCounterAndClear());
- waitForCounterAndClear(2);
- assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName());
-
- final LookupDesc[] lookups = dataModelDesc.getLookups();
- assertTrue(lookups.length > 0);
- dataModelDesc.setLookups(lookups);
- metadataManager.updateDataModelDesc(dataModelDesc);
- //only one for data model update
- assertEquals(1, broadcaster.getCounterAndClear());
- waitForCounterAndClear(1);
- assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length);
-
- }
-
- private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) {
- for (IRealization realization : realizations) {
- if (realization.getType() == type && realization.getName().equals(name)) {
- return true;
- }
- }
- return false;
- }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+//*/
+//
+//package org.apache.kylin.rest.service;
+//
+//import org.apache.kylin.common.KylinConfig;
+//import org.apache.kylin.common.restclient.Broadcaster;
+//import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+//import org.apache.kylin.cube.*;
+//import org.apache.kylin.cube.model.CubeDesc;
+//import org.apache.kylin.metadata.MetadataManager;
+//import org.apache.kylin.metadata.model.DataModelDesc;
+//import org.apache.kylin.metadata.model.LookupDesc;
+//import org.apache.kylin.metadata.model.TableDesc;
+//import org.apache.kylin.metadata.project.ProjectInstance;
+//import org.apache.kylin.metadata.project.ProjectManager;
+//import org.apache.kylin.metadata.realization.IRealization;
+//import org.apache.kylin.metadata.realization.RealizationType;
+//import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet;
+//import org.eclipse.jetty.server.Server;
+//import org.eclipse.jetty.servlet.ServletContextHandler;
+//import org.eclipse.jetty.servlet.ServletHolder;
+//import org.junit.*;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.util.Arrays;
+//import java.util.Set;
+//import java.util.UUID;
+//import java.util.concurrent.atomic.AtomicLong;
+//
+//import static org.junit.Assert.*;
+//
+///**
+// */
+//public class CacheServiceTest extends LocalFileMetadataTestCase {
+//
+// private static Server server;
+//
+// private static String ZK_ADDRESS = "localhost:2199";
+//
+// private static KylinConfig configA;
+// private static KylinConfig configB;
+//
+// private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class);
+//
+// private static AtomicLong counter = new AtomicLong();
+//
+// @BeforeClass
+// public static void beforeClass() throws Exception {
+// staticCreateTestMetadata();
+// configA = KylinConfig.getInstanceFromEnv();
+// configA.setProperty("kylin.rest.servers", "localhost:7070");
+// configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
+// configB.setProperty("kylin.rest.servers", "localhost:7070");
+// configB.setMetadataUrl("../examples/test_metadata");
+//
+// server = new Server(7070);
+// ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+// context.setContextPath("/");
+// server.setHandler(context);
+//
+// final CacheService serviceA = new CacheService() {
+// @Override
+// public KylinConfig getConfig() {
+// return configA;
+// }
+// };
+// final CacheService serviceB = new CacheService() {
+// @Override
+// public KylinConfig getConfig() {
+// return configB;
+// }
+// };
+//
+// final CubeService cubeServiceA = new CubeService() {
+// @Override
+// public KylinConfig getConfig() {
+// return configA;
+// }
+// };
+// final CubeService cubeServiceB = new CubeService() {
+// @Override
+// public KylinConfig getConfig() {
+// return configB;
+// }
+// };
+//
+// serviceA.setCubeService(cubeServiceA);
+// serviceA.initCubeChangeListener();
+// serviceB.setCubeService(cubeServiceB);
+// serviceB.initCubeChangeListener();
+//
+// context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
+// @Override
+// public void handle(String type, String name, String event) {
+//
+// Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
+// Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
+// final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;
+// logger.info(log);
+// try {
+// switch (wipeEvent) {
+// case CREATE:
+// case UPDATE:
+// serviceA.rebuildCache(wipeType, name);
+// serviceB.rebuildCache(wipeType, name);
+// break;
+// case DROP:
+// serviceA.removeCache(wipeType, name);
+// serviceB.removeCache(wipeType, name);
+// break;
+// default:
+// throw new RuntimeException("invalid type:" + wipeEvent);
+// }
+// } finally {
+// counter.incrementAndGet();
+// }
+// }
+// })), "/");
+//
+// server.start();
+// }
+//
+// @AfterClass
+// public static void afterClass() throws Exception {
+// server.stop();
+// cleanAfterClass();
+// }
+//
+// @Before
+// public void setUp() throws Exception {
+// counter.set(0L);
+// createTestMetadata();
+// }
+//
+// @After
+// public void after() throws Exception {
+// cleanupTestMetadata();
+// }
+//
+// private void waitForCounterAndClear(long count) {
+// int retryTimes = 0;
+// while ((!counter.compareAndSet(count, 0L))) {
+// if (++retryTimes > 30) {
+// throw new RuntimeException("timeout");
+// }
+// try {
+// Thread.sleep(100L);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// private static CubeManager getCubeManager(KylinConfig config) throws Exception {
+// return CubeManager.getInstance(config);
+// }
+//
+// private static ProjectManager getProjectManager(KylinConfig config) throws Exception {
+// return ProjectManager.getInstance(config);
+// }
+//
+// private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception {
+// return CubeDescManager.getInstance(config);
+// }
+//
+// private static MetadataManager getMetadataManager(KylinConfig config) throws Exception {
+// return MetadataManager.getInstance(config);
+// }
+//
+// @Test
+// public void testBasic() throws Exception {
+// assertTrue(!configA.equals(configB));
+//
+// assertNotNull(getCubeManager(configA));
+// assertNotNull(getCubeManager(configB));
+// assertNotNull(getCubeDescManager(configA));
+// assertNotNull(getCubeDescManager(configB));
+// assertNotNull(getProjectManager(configB));
+// assertNotNull(getProjectManager(configB));
+// assertNotNull(getMetadataManager(configB));
+// assertNotNull(getMetadataManager(configB));
+//
+// assertTrue(!getCubeManager(configA).equals(getCubeManager(configB)));
+// assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB)));
+// assertTrue(!getProjectManager(configA).equals(getProjectManager(configB)));
+// assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB)));
+//
+// assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size());
+// }
+//
+// @Test
+// public void testCubeCRUD() throws Exception {
+// final Broadcaster broadcaster = Broadcaster.getInstance(configA);
+// broadcaster.getCounterAndClear();
+//
+// getStore().deleteResource("/cube/a_whole_new_cube.json");
+//
+// //create cube
+//
+// final String cubeName = "a_whole_new_cube";
+// final CubeManager cubeManager = getCubeManager(configA);
+// final CubeManager cubeManagerB = getCubeManager(configB);
+// final ProjectManager projectManager = getProjectManager(configA);
+// final ProjectManager projectManagerB = getProjectManager(configB);
+// final CubeDescManager cubeDescManager = getCubeDescManager(configA);
+// final CubeDescManager cubeDescManagerB = getCubeDescManager(configB);
+// final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc");
+//
+// assertTrue(cubeManager.getCube(cubeName) == null);
+// assertTrue(cubeManagerB.getCube(cubeName) == null);
+// assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+// assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+// cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null);
+// //one for cube update, one for project update
+// assertEquals(2, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(2);
+//
+// assertNotNull(cubeManager.getCube(cubeName));
+// assertNotNull(cubeManagerB.getCube(cubeName));
+// assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+// assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//
+// //update cube
+// CubeInstance cube = cubeManager.getCube(cubeName);
+// assertEquals(0, cube.getSegments().size());
+// assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
+// CubeSegment segment = new CubeSegment();
+// segment.setName("test_segment");
+// CubeUpdate cubeBuilder = new CubeUpdate(cube);
+// cubeBuilder.setToAddSegs(segment);
+// cube = cubeManager.updateCube(cubeBuilder);
+// //one for cube update
+// assertEquals(1, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(1);
+// assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());
+// assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName());
+//
+// //delete cube
+// cubeManager.dropCube(cubeName, false);
+// //one for cube update, one for project update
+// assertEquals(2, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(2);
+//
+// assertTrue(cubeManager.getCube(cubeName) == null);
+// assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+// assertTrue(cubeManagerB.getCube(cubeName) == null);
+// assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//
+// final String cubeDescName = "test_cube_desc";
+// cubeDesc.setName(cubeDescName);
+// cubeDesc.setLastModified(0);
+// assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
+// assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
+// cubeDescManager.createCubeDesc(cubeDesc);
+// //one for add cube desc
+// assertEquals(1, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(1);
+// assertNotNull(cubeDescManager.getCubeDesc(cubeDescName));
+// assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName));
+//
+// cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email"));
+// cubeDescManager.updateCubeDesc(cubeDesc);
+// assertEquals(1, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(1);
+// assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList());
+//
+// cubeDescManager.removeCubeDesc(cubeDesc);
+// //one for add cube desc
+// assertEquals(1, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(1);
+// assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
+// assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
+//
+// getStore().deleteResource("/cube/a_whole_new_cube.json");
+// }
+//
+// private TableDesc createTestTableDesc() {
+// TableDesc tableDesc = new TableDesc();
+// tableDesc.setDatabase("TEST_DB");
+// tableDesc.setName("TEST_TABLE");
+// tableDesc.setUuid(UUID.randomUUID().toString());
+// tableDesc.setLastModified(0);
+// return tableDesc;
+// }
+//
+// @Test
+// public void testMetaCRUD() throws Exception {
+// final MetadataManager metadataManager = MetadataManager.getInstance(configA);
+// final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
+// final Broadcaster broadcaster = Broadcaster.getInstance(configA);
+// broadcaster.getCounterAndClear();
+//
+// TableDesc tableDesc = createTestTableDesc();
+// assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null);
+// assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null);
+// metadataManager.saveSourceTable(tableDesc);
+// //only one for table insert
+// assertEquals(1, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(1);
+// assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity()));
+// assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity()));
+//
+// final String dataModelName = "test_data_model";
+// DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc");
+// dataModelDesc.setName(dataModelName);
+// dataModelDesc.setLastModified(0);
+// assertTrue(metadataManager.getDataModelDesc(dataModelName) == null);
+// assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null);
+//
+// dataModelDesc.setName(dataModelName);
+// metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN");
+// //one for data model creation, one for project meta update
+// assertEquals(2, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(2);
+// assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName());
+//
+// final LookupDesc[] lookups = dataModelDesc.getLookups();
+// assertTrue(lookups.length > 0);
+// dataModelDesc.setLookups(lookups);
+// metadataManager.updateDataModelDesc(dataModelDesc);
+// //only one for data model update
+// assertEquals(1, broadcaster.getCounterAndClear());
+// waitForCounterAndClear(1);
+// assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length);
+//
+// }
+//
+// private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) {
+// for (IRealization realization : realizations) {
+// if (realization.getType() == type && realization.getName().equals(name)) {
+// return true;
+// }
+// }
+// return false;
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index f8dc945..ca4fe39 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -18,6 +18,12 @@
package org.apache.kylin.rest.service;
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeManager;
@@ -26,42 +32,42 @@ import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.junit.*;
import org.junit.runner.RunWith;
import org.springframework.security.authentication.TestingAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
/**
* @author xduo
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:kylinSecurity.xml" })
@ActiveProfiles("testing")
-public class ServiceTestBase extends LocalFileMetadataTestCase {
-
- @BeforeClass
- public static void setupResource() throws Exception {
- staticCreateTestMetadata();
- Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", "ROLE_ADMIN");
- SecurityContextHolder.getContext().setAuthentication(authentication);
- }
-
- @AfterClass
- public static void tearDownResource() {
- }
+public class ServiceTestBase extends TestBaseWithZookeeper {
@Before
public void setup() throws Exception {
this.createTestMetadata();
+ UserService.UserGrantedAuthority userGrantedAuthority = new UserService.UserGrantedAuthority();
+ userGrantedAuthority.setAuthority("ROLE_ADMIN");
+ UserDetails user = new User("ADMIN", "skippped-ldap", Lists.newArrayList(userGrantedAuthority));
+ Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN");
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+ KylinConfig kylinConfig = this.getTestConfig();
+ kylinConfig.setRestAddress("localhost:7070");
+
MetadataManager.clearCache();
CubeDescManager.clearCache();
CubeManager.clearCache();