You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 03:53:25 UTC
[09/15] kylin git commit: KYLIN-1311 fix small bug
KYLIN-1311 fix small bug
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bbfe8ae8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bbfe8ae8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bbfe8ae8
Branch: refs/heads/helix-201602
Commit: bbfe8ae84769ecae7de314fb6a8d3c700b654ee9
Parents: 1aaa267
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jan 15 17:57:26 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Feb 6 13:33:06 2016 +0800
----------------------------------------------------------------------
.../engine/streaming/StreamingManager.java | 11 +++++-----
.../rest/controller/StreamingController.java | 13 ++++++------
.../helix/LeaderStandbyStateModelFactory.java | 21 +++++++++++++++++---
3 files changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/bbfe8ae8/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..798fc3f 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -137,6 +137,12 @@ public class StreamingManager {
return streamingMap.get(name);
}
+ public StreamingConfig getStreamingConfigByCube(String cubeName) {
+ String streamingConfig = cubeName + "_streaming";
+ return getStreamingConfig(streamingConfig);
+ }
+
+
public List<StreamingConfig> listAllStreaming() {
return new ArrayList<>(streamingMap.values());
}
@@ -168,11 +174,6 @@ public class StreamingManager {
streamingMap.remove(streamingConfig.getName());
}
- public StreamingConfig getConfig(String name) {
- name = name.toUpperCase();
- return streamingMap.get(name);
- }
-
public void removeStreamingLocal(String streamingName) {
streamingMap.removeLocal(streamingName);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bbfe8ae8/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 57831d5..fb806d1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -236,13 +236,11 @@ public class StreamingController extends BasicController {
* @return
* @throws IOException
*/
- @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT})
+ @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
@ResponseBody
- public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
- streamingBuildRequest.setStreaming(streamingName);
- StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName);
- Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "' is not found.");
- String cubeName = streamingConfig.getCubeName();
+ public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
+ StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+ Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
CubeInstance cube = cubes.get(0);
@@ -257,7 +255,8 @@ public class StreamingController extends BasicController {
}
}
- streamingService.buildStream(streamingName, streamingBuildRequest);
+ streamingBuildRequest.setStreaming(streamingConfig.getName());
+ streamingService.buildStream(cubeName, streamingBuildRequest);
streamingBuildRequest.setMessage("Build request is submitted successfully.");
streamingBuildRequest.setSuccessful(true);
return streamingBuildRequest;
http://git-wip-us.apache.org/repos/asf/kylin/blob/bbfe8ae8/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index df23ea0..8614e8c 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -10,6 +10,10 @@ import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.MockJobLock;
@@ -48,7 +52,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
try {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
DefaultScheduler scheduler = DefaultScheduler.createInstance();
scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
while (!scheduler.hasStarted()) {
@@ -89,11 +93,22 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
String resourceName = message.getResourceId().stringify();
Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
- long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
+ long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
- long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
+ long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+ final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ for (CubeSegment segment : cube.getSegments()) {
+ if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) {
+ logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
+ return;
+ }
+ }
+
KylinConfigBase.getKylinHome();
String segmentId = start + "_" + end;
String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;