You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/04 03:03:11 UTC
[30/43] kylin git commit: KYLIN-1311 Stream cubing auto assignment
and load balance
KYLIN-1311 Stream cubing auto assignment and load balance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab60480f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab60480f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab60480f
Branch: refs/heads/helix-rebase
Commit: ab60480f244a106a4e3179590956a04246e9f4db
Parents: 55558551
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jan 14 14:59:54 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Mar 4 09:52:19 2016 +0800
----------------------------------------------------------------------
.../kylin/engine/streaming/BootstrapConfig.java | 8 --
.../engine/streaming/cli/StreamingCLI.java | 3 -
.../kylin/rest/controller/CubeController.java | 5 ++
.../rest/controller/StreamingController.java | 50 +++++++++++++
.../kylin/rest/helix/HelixClusterAdmin.java | 13 +++-
.../helix/LeaderStandbyStateModelFactory.java | 43 +++++++----
.../rest/request/StreamingBuildRequest.java | 77 ++++++++++++++++++++
.../kylin/rest/request/StreamingRequest.java | 4 +-
.../kylin/rest/service/StreamingService.java | 27 +++++++
9 files changed, 201 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index a3e2db5..2b83b84 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -36,14 +36,6 @@ public class BootstrapConfig {
this.streaming = streaming;
}
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
public boolean isFillGap() {
return fillGap;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index a73a6ac..96ad1ad 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -72,9 +72,6 @@ public class StreamingCLI {
case "-streaming":
bootstrapConfig.setStreaming(args[++i]);
break;
- case "-partition":
- bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
- break;
case "-fillGap":
bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
break;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 9afa750..4ab640f 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,14 +27,19 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.exception.JobException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index e22bd30..57831d5 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -21,14 +21,23 @@ package org.apache.kylin.rest.controller;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.exception.NotFoundException;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
import org.apache.kylin.rest.request.StreamingRequest;
+import org.apache.kylin.rest.service.CubeService;
import org.apache.kylin.rest.service.KafkaConfigService;
import org.apache.kylin.rest.service.StreamingService;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -36,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;
+import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
@@ -58,6 +68,9 @@ public class StreamingController extends BasicController {
@Autowired
private KafkaConfigService kafkaConfigService;
+ @Autowired
+ private CubeService cubeService;
+
@RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
@ResponseBody
public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
@@ -214,6 +227,43 @@ public class StreamingController extends BasicController {
request.setMessage(message);
}
+
+
+ /**
+ * Send a stream build request
+ *
+ * @param cubeName Cube ID
+ * @return
+ * @throws IOException
+ */
+ @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT})
+ @ResponseBody
+ public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
+ streamingBuildRequest.setStreaming(streamingName);
+ StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName);
+ Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "' is not found.");
+ String cubeName = streamingConfig.getCubeName();
+ List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
+ Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
+ CubeInstance cube = cubes.get(0);
+ if (streamingBuildRequest.isFillGap() == false) {
+ Preconditions.checkArgument(streamingBuildRequest.getEnd() > streamingBuildRequest.getStart(), "End time should be greater than start time.");
+ for (CubeSegment segment : cube.getSegments()) {
+ if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+ streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
+ streamingBuildRequest.setSuccessful(false);
+ return streamingBuildRequest;
+ }
+ }
+ }
+
+ streamingService.buildStream(streamingName, streamingBuildRequest);
+ streamingBuildRequest.setMessage("Build request is submitted successfully.");
+ streamingBuildRequest.setSuccessful(true);
+ return streamingBuildRequest;
+
+ }
+
public void setStreamingService(StreamingService streamingService) {
this.streamingService= streamingService;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 6300383..f62204d 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -45,7 +45,7 @@ import java.util.concurrent.ConcurrentMap;
public class HelixClusterAdmin {
public static final String RESOURCE_NAME_JOB_ENGINE = "Resource_JobEngine";
- public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Streame_";
+ public static final String RESOURCE_STREAME_CUBE_PREFIX = "Resource_Stream_";
public static final String MODEL_LEADER_STANDBY = "LeaderStandby";
public static final String MODEL_ONLINE_OFFLINE = "OnlineOffline";
@@ -115,15 +115,22 @@ public class HelixClusterAdmin {
}
- public void addStreamCubeSlice(String cubeName, long start, long end) {
- String resourceName = RESOURCE_STREAME_CUBE_PREFIX + cubeName + "_" + start + "_" + end;
+ public void addStreamingJob(String streamingName, long start, long end) {
+ String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+ } else {
+ logger.warn("Resource '" + resourceName + "' already exists in cluster, skip adding.");
}
admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
}
+
+ public void dropStreamingJob(String streamingName, long start, long end) {
+ String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
+ admin.dropResource(clusterName, resourceName);
+ }
/**
* Start the instance and register the state model factory
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index c2a78e7..df23ea0 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -9,21 +9,24 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
-import org.apache.kylin.engine.streaming.cli.StreamingCLI;
+import org.apache.kylin.common.KylinConfigBase;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.MockJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
/**
*/
public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class);
-
+
@Override
public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
@@ -38,7 +41,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
}
public static class JobEngineStateModel extends TransitionHandler {
-
+
public static JobEngineStateModel INSTANCE = new JobEngineStateModel();
@Transition(to = "LEADER", from = "STANDBY")
@@ -62,7 +65,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
DefaultScheduler.destroyInstance();
-
+
}
@Transition(to = "STANDBY", from = "OFFLINE")
@@ -71,7 +74,6 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
}
-
@Transition(to = "OFFLINE", from = "STANDBY")
public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
@@ -80,7 +82,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
}
public static class StreamCubeStateModel extends TransitionHandler {
-
+
public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel();
@Transition(to = "LEADER", from = "STANDBY")
@@ -90,27 +92,40 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
- String cubeName = temp.substring(0, temp.lastIndexOf("_"));
+ String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
+
+ KylinConfigBase.getKylinHome();
+ String segmentId = start + "_" + end;
+ String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;
+ logger.info("Executing: " + cmd);
+ try {
+ String line;
+ Process p = Runtime.getRuntime().exec(cmd);
+ BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ while ((line = input.readLine()) != null) {
+ logger.info(line);
+ }
+ input.close();
+ } catch (IOException err) {
+ logger.error("Error happens during build streaming '" + resourceName + "'", err);
+ throw new RuntimeException(err);
+ }
- final Runnable runnable = new OneOffStreamingBuilder(cubeName, start, end).build();
- runnable.run();
}
@Transition(to = "STANDBY", from = "LEADER")
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-
}
@Transition(to = "STANDBY", from = "OFFLINE")
public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
-
- }
+ }
@Transition(to = "OFFLINE", from = "STANDBY")
public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
-
+
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
new file mode 100644
index 0000000..e06a06c
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.request;
+
+public class StreamingBuildRequest {
+
+ private String streaming;
+ private long start;
+ private long end;
+ private boolean fillGap;
+ private String message;
+ private boolean successful;
+
+ public String getStreaming() {
+ return streaming;
+ }
+
+ public void setStreaming(String streaming) {
+ this.streaming = streaming;
+ }
+
+ public boolean isSuccessful() {
+ return successful;
+ }
+
+ public void setSuccessful(boolean successful) {
+ this.successful = successful;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public boolean isFillGap() {
+ return fillGap;
+ }
+
+ public void setFillGap(boolean fillGap) {
+ this.fillGap = fillGap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
index 07c30f3..b737c3e 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
@@ -19,7 +19,9 @@
package org.apache.kylin.rest.request;
-import java.lang.String;public class StreamingRequest {
+import java.lang.String;
+
+public class StreamingRequest {
private String project;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab60480f/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e40426b..da20949 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -18,12 +18,22 @@
package org.apache.kylin.rest.service;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;
import java.io.IOException;
@@ -33,6 +43,7 @@ import java.util.List;
@Component("streamingMgmtService")
public class StreamingService extends BasicService {
+ private static final Logger logger = LoggerFactory.getLogger(StreamingService.class);
@Autowired
private AccessService accessService;
@@ -87,4 +98,20 @@ public class StreamingService extends BasicService {
getStreamingManager().removeStreamingConfig(config);
}
+
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
+ public void buildStream(String cube, StreamingBuildRequest streamingBuildRequest) {
+ HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
+ if (streamingBuildRequest.isFillGap()) {
+ final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingBuildRequest.getStreaming());
+ final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+ logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
+ for (Pair<Long, Long> gap : gaps) {
+ clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), gap.getFirst(), gap.getSecond());
+ }
+ } else {
+ clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), streamingBuildRequest.getStart(), streamingBuildRequest.getEnd());
+ }
+ }
+
}