You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 03:53:30 UTC
[14/15] kylin git commit: KYLIN-1311 Stream cubing auto assignment
and load balance
KYLIN-1311 Stream cubing auto assignment and load balance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/688b762d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/688b762d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/688b762d
Branch: refs/heads/helix-201602
Commit: 688b762dc5ee756261bc42576935029fb9180f11
Parents: c615dcf
Author: shaofengshi <sh...@apache.org>
Authored: Sat Feb 6 11:49:59 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Feb 6 13:33:07 2016 +0800
----------------------------------------------------------------------
build/bin/streaming_check.sh | 13 ++-
build/bin/streaming_fillgap.sh | 1 -
build/conf/kylin.properties | 6 +-
.../apache/kylin/common/KylinConfigBase.java | 4 +
.../kylin/engine/streaming/StreamingConfig.java | 33 ++++++
.../engine/streaming/cli/StreamingCLI.java | 2 +-
.../streaming/monitor/StreamingMonitor.java | 11 +-
.../rest/controller/ClusterController.java | 55 +++++++---
.../rest/controller/StreamingController.java | 52 ++++++++-
.../kylin/rest/helix/HelixClusterAdmin.java | 69 +++++++++---
.../helix/StreamCubeBuildTransitionHandler.java | 105 ++++++++++++++-----
.../rest/request/StreamingBuildRequest.java | 13 +--
.../kylin/rest/service/StreamingService.java | 27 +++--
13 files changed, 299 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/bin/streaming_check.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh
index fef0139..4c5431a 100644
--- a/build/bin/streaming_check.sh
+++ b/build/bin/streaming_check.sh
@@ -20,10 +20,9 @@
source /etc/profile
source ~/.bash_profile
-receivers=$1
-host=$2
-tablename=$3
-authorization=$4
-projectname=$5
-cubename=$6
-sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname}
\ No newline at end of file
+CUBE_NAME=$1
+AUTHORIZATION=$2
+KYLIN_HOST=$3
+
+cd ${KYLIN_HOME}
+curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/checkgap
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
index 31c4886..fe8c0b5 100644
--- a/build/bin/streaming_fillgap.sh
+++ b/build/bin/streaming_fillgap.sh
@@ -25,5 +25,4 @@ AUTHORIZATION=$2
KYLIN_HOST=$3
cd ${KYLIN_HOME}
-#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/fillgap
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index b7e9b28..75269de 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -2,12 +2,12 @@
# Whether this kylin run as an instance of a cluster
kylin.cluster.enabled=false
-# Comma separated list of zk servers;
-# Optional; if absent, will use HBase zookeeper; set if use a different zk;
+# Comma separated list of zk servers, for cluster coordination;
+# Optional; if absent, will use HBase zookeeper; set it if use a different zk;
kylin.zookeeper.address=
# REST address of this instance, need be accessible from other instances;
-# optional, default be <hostname>:7070
+# optional, default be <hostname_fqdn>:<port>
kylin.rest.address=
# whether run a cluster controller in this instance; a robust cluster need at least 3 controllers.
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7c127f7..87e4566 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -541,6 +541,10 @@ public class KylinConfigBase implements Serializable {
public String getClusterName() {
return this.getOptional("kylin.cluster.name", getMetadataUrlPrefix());
}
+
+ public int getClusterMaxPartitionPerRegion() {
+ return Integer.parseInt(getOptional("kylin.cluster.max.partition.per.resource", "100"));
+ }
public void setClusterName(String clusterName) {
setProperty("kylin.cluster.name", clusterName);
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index f0a7ab1..ee9aed8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -39,6 +39,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.List;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -64,6 +65,14 @@ public class StreamingConfig extends RootPersistentEntity {
@JsonProperty("cubeName")
private String cubeName;
+ @JsonProperty("partitions")
+ private List<String> partitions;
+
+ @JsonProperty("max_gap")
+ private long maxGap = 30 * 60 * 1000l; // 30 minutes
+ @JsonProperty("max_gap_number")
+ private int maxGapNumber = 10; // 10
+
public String getCubeName() {
return cubeName;
}
@@ -96,6 +105,30 @@ public class StreamingConfig extends RootPersistentEntity {
return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json";
}
+ public List<String> getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(List<String> partitions) {
+ this.partitions = partitions;
+ }
+
+ public long getMaxGap() {
+ return maxGap;
+ }
+
+ public void setMaxGap(long maxGap) {
+ this.maxGap = maxGap;
+ }
+
+ public int getMaxGapNumber() {
+ return maxGapNumber;
+ }
+
+ public void setMaxGapNumber(int maxGapNumber) {
+ this.maxGapNumber = maxGapNumber;
+ }
+
@Override
public StreamingConfig clone() {
try {
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index 96ad1ad..88f5e18 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -82,7 +82,7 @@ public class StreamingCLI {
}
if (bootstrapConfig.isFillGap()) {
final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
- final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+ final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap());
logger.info("all gaps:" + StringUtils.join(gaps, ","));
for (Pair<Long, Long> gap : gaps) {
startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
index 9609442..9d2bd45 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
@@ -74,7 +74,7 @@ public class StreamingMonitor {
sendMail(receivers, title, stringBuilder.toString());
}
- public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+ public static final List<Pair<Long, Long>> findGaps(String cubeName, long maxGapAtOnce) {
List<CubeSegment> segments = getSortedReadySegments(cubeName);
List<Pair<Long, Long>> gaps = Lists.newArrayList();
for (int i = 0; i < segments.size() - 1; ++i) {
@@ -83,7 +83,12 @@ public class StreamingMonitor {
if (first.getDateRangeEnd() == second.getDateRangeStart()) {
continue;
} else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
- gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+ long start = first.getDateRangeEnd();
+ while (start < second.getDateRangeStart()) {
+ long end = Math.min(start + maxGapAtOnce, second.getDateRangeStart());
+ gaps.add(Pair.newPair(start, end));
+ start = end;
+ }
}
}
return gaps;
@@ -119,7 +124,7 @@ public class StreamingMonitor {
logger.info("cube:" + cubeName + " does not exist");
return;
}
- List<Pair<Long, Long>> gaps = findGaps(cubeName);
+ List<Pair<Long, Long>> gaps = findGaps(cubeName, Long.MAX_VALUE);
List<Pair<String, String>> overlaps = Lists.newArrayList();
StringBuilder content = new StringBuilder();
if (!gaps.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
index 97fff36..86a0398 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
@@ -19,23 +19,26 @@
package org.apache.kylin.rest.controller;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.constant.JobTimeFilterEnum;
-import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.helix.HelixClusterAdmin;
-import org.apache.kylin.rest.request.JobListRequest;
-import org.apache.kylin.rest.service.JobService;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
-import java.util.*;
+import java.io.IOException;
+import java.util.Collection;
/**
*
@@ -56,15 +59,37 @@ public class ClusterController extends BasicController implements InitializingBe
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
- clusterAdmin.start();
+ if (kylinConfig.isClusterEnabled()) {
+ final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+ clusterAdmin.start();
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- clusterAdmin.stop();
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ clusterAdmin.stop();
+ }
+ }));
+ } else {
+ String serverMode = kylinConfig.getServerMode();
+ if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
+ logger.info("Initializing Job Engine ....");
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ DefaultScheduler scheduler = DefaultScheduler.createInstance();
+ scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
+ if (!scheduler.hasStarted()) {
+ logger.error("scheduler has not been started");
+ System.exit(1);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
}
- }));
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 209c552..e33a1c9 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -24,9 +24,11 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
@@ -249,7 +251,12 @@ public class StreamingController extends BasicController {
}
streamingBuildRequest.setStreaming(streamingConfig.getName());
- streamingService.buildStream(cube, streamingBuildRequest);
+ try {
+ streamingService.buildStream(cube, streamingBuildRequest);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return streamingBuildRequest;
+ }
streamingBuildRequest.setMessage("Build request is submitted successfully.");
streamingBuildRequest.setSuccessful(true);
return streamingBuildRequest;
@@ -274,13 +281,52 @@ public class StreamingController extends BasicController {
StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest();
streamingBuildRequest.setStreaming(streamingConfig.getName());
- streamingService.fillGap(cube);
- streamingBuildRequest.setMessage("FillGap request is submitted successfully.");
+ List<Pair<Long, Long>> gaps = null;
+ try {
+ gaps = streamingService.fillGap(cube);
+ } catch (IOException e) {
+ logger.error("", e);
+ return streamingBuildRequest;
+ }
+ streamingBuildRequest.setMessage("FillGap request is submitted successfully, gap number: " + gaps.size());
+ streamingBuildRequest.setSuccessful(true);
+ return streamingBuildRequest;
+
+ }
+
+ /**
+ * check wheter gap exists in a cube
+ *
+ * @param cubeName Cube Name
+ * @return
+ * @throws IOException
+ */
+ @RequestMapping(value = "/{cubeName}/checkgap", method = { RequestMethod.PUT })
+ @ResponseBody
+ public StreamingBuildRequest checkGap(@PathVariable String cubeName) {
+ StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+ Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
+ List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
+ Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
+ CubeInstance cube = cubes.get(0);
+
+ List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap());
+ logger.info("all gaps:" + StringUtils.join(gaps, ","));
+
+ StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest();
+ streamingBuildRequest.setStreaming(streamingConfig.getName());
+ if (gaps.size() > 0) {
+ streamingBuildRequest.setMessage(gaps.size() + " gaps in cube: " + StringUtils.join(gaps, ","));
+ } else {
+ streamingBuildRequest.setMessage("No gap.");
+ }
streamingBuildRequest.setSuccessful(true);
return streamingBuildRequest;
}
+
+
public void setStreamingService(StreamingService streamingService) {
this.streamingService = streamingService;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 4da9a86..680e371 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -18,11 +18,9 @@
package org.apache.kylin.rest.helix;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.helix.*;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.HelixControllerMain;
@@ -32,11 +30,11 @@ import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.StreamingBuildRequest;
import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +66,7 @@ public class HelixClusterAdmin {
private static final Logger logger = LoggerFactory.getLogger(HelixClusterAdmin.class);
private final String zkAddress;
- private final ZKHelixAdmin admin;
+ private final HelixAdmin admin;
private final String clusterName;
private HelixClusterAdmin(KylinConfig kylinConfig) {
@@ -80,7 +78,7 @@ public class HelixClusterAdmin {
zkAddress = HBaseConnection.getZKConnectString();
logger.info("no 'kylin.zookeeper.address' in kylin.properties, use HBase zookeeper " + zkAddress);
}
-
+
this.clusterName = kylinConfig.getClusterName();
this.admin = new ZKHelixAdmin(zkAddress);
}
@@ -130,24 +128,59 @@ public class HelixClusterAdmin {
}
- public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) {
+ public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) throws IOException {
String resourceName = streamingBuildRequest.toResourceName();
- if (admin.getResourcesInCluster(clusterName).contains(resourceName)) {
- logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add.");
- admin.dropResource(clusterName, resourceName);
+ if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+ logger.info("Resource '" + resourceName + "' is new, add it with 0 partitions in cluster.");
+ admin.addResource(clusterName, resourceName, 0, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
}
- admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
- rebalanceWithTag(resourceName, TAG_STREAM_BUILDER);
+ IdealState idealState = admin.getResourceIdealState(clusterName, resourceName);
+
+ StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming());
+ List<String> partitions = streamingConfig.getPartitions();
+ if (partitions == null) {
+ partitions = Lists.newArrayList();
+ }
+
+ if (partitions.size() != idealState.getNumPartitions() || idealState.getNumPartitions() >= kylinConfig.getClusterMaxPartitionPerRegion()) {
+ if (partitions.size() != idealState.getNumPartitions()) {
+ logger.error("Cluster resource partition number doesn't match with the partitions in StreamingConfig: " + resourceName);
+ } else {
+ logger.error("Partitions number for resource '" + resourceName + " exceeds the up limit: " + kylinConfig.getClusterMaxPartitionPerRegion());
+ }
+ logger.info("Drop and create resource: " + resourceName);
+ cleanResourcePartitions(resourceName);
+ idealState = admin.getResourceIdealState(clusterName, resourceName);
+ streamingConfig.getPartitions().clear();
+ StreamingManager.getInstance(kylinConfig).updateStreamingConfig(streamingConfig);
+ streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming());
+ partitions = Lists.newArrayList();
+ }
+
+ partitions.add(streamingBuildRequest.toPartitionName());
+ streamingConfig.setPartitions(partitions);
+ StreamingManager.getInstance(kylinConfig).updateStreamingConfig(streamingConfig);
+
+ idealState.setNumPartitions(idealState.getNumPartitions() + 1);
+ admin.setResourceIdealState(clusterName, resourceName, idealState);
+ rebalanceWithTag(resourceName, TAG_STREAM_BUILDER);
}
- public void dropStreamingJob(String streamingName, long start, long end) {
- String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
- admin.dropResource(clusterName, resourceName);
+
+ private void cleanResourcePartitions(String resourceName) {
+ IdealState is = admin.getResourceIdealState(clusterName, resourceName);
+ is.getRecord().getListFields().clear();
+ is.getRecord().getMapFields().clear();
+ is.setNumPartitions(0);
+ admin.setResourceIdealState(clusterName, resourceName, is);
+
+ logger.info("clean all partitions in resource: " + resourceName);
}
/**
* Start the instance and register the state model factory
+ *
* @param instanceName
* @throws Exception
*/
@@ -161,11 +194,11 @@ public class HelixClusterAdmin {
/**
* Rebalance the resource with the tags
+ *
* @param tags
*/
protected void rebalanceWithTag(String resourceName, String tag) {
- List<String> instances = admin.getInstancesInClusterWithTag(clusterName, tag);
- admin.rebalance(clusterName, resourceName, instances.size(), "", tag);
+ admin.rebalance(clusterName, resourceName, 2, null, tag);
}
/**
@@ -206,6 +239,7 @@ public class HelixClusterAdmin {
/**
* Check whether current kylin instance is in the leader role
+ *
* @return
*/
public boolean isLeaderRole(String resourceName) {
@@ -220,6 +254,7 @@ public class HelixClusterAdmin {
/**
* Add instance to cluster, with a tag list
+ *
* @param instanceName should be unique in format: hostName_port
* @param tags
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
index 44d8302..705d8a7 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
@@ -11,6 +11,7 @@ import org.apache.kylin.common.KylinConfigBase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.rest.request.StreamingBuildRequest;
import org.slf4j.Logger;
@@ -43,43 +44,81 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler {
@Transition(to = "LEADER", from = "STANDBY")
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
String resourceName = message.getResourceId().stringify();
- StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+ final StreamingBuildRequest streamingBuildRequest = getStreamingBuildRequest(resourceName, message.getPartitionName());
+ if (streamingBuildRequest != null && isSuccessfullyBuilt(streamingBuildRequest) == false) {
+ KylinConfigBase.getKylinHome();
+ String segmentId = streamingBuildRequest.toPartitionName();
+ String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming();
+ runCMD(cmd);
+ }
+ }
- final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()).getCubeName();
+ @Transition(to = "STANDBY", from = "LEADER")
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ String resourceName = message.getResourceId().stringify();
+ logger.info("Partition " + message.getPartitionId() + " becomes as Standby");
+ /*
+ final StreamingBuildRequest streamingBuildRequest = getStreamingBuildRequest(resourceName, message.getPartitionName());
+ if (isSuccessfullyBuilt(streamingBuildRequest) == false) {
+ KylinConfigBase.getKylinHome();
+ String segmentId = streamingBuildRequest.toPartitionName();
+ String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId;
+ runCMD(cmd);
+ }
+ */
+ }
+
+ private boolean isSuccessfullyBuilt(StreamingBuildRequest streamingBuildRequest) {
+ final StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming());
+ final String cubeName = streamingConfig.getCubeName();
final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
for (CubeSegment segment : cube.getSegments()) {
if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
- logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
- return;
+ logger.info("Segment " + segment.getName() + " already exist.");
+ return true;
}
}
- KylinConfigBase.getKylinHome();
- String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
- String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming();
- logger.info("Executing: " + cmd);
- try {
- String line;
- Process p = Runtime.getRuntime().exec(cmd);
- BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
- while ((line = input.readLine()) != null) {
- logger.info(line);
+ return false;
+ }
+
+ private StreamingBuildRequest getStreamingBuildRequest(String resourceName, String partitionName) {
+ String streamConfigName = resourceName.substring(HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX.length());
+ int partitionId = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+
+ StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamConfigName);
+
+ int retry = 0;
+ while ((streamingConfig.getPartitions() == null || streamingConfig.getPartitions().isEmpty() || partitionId > (streamingConfig.getPartitions().size() - 1) && retry < 10)) {
+ logger.error("No segment information in StreamingConfig '" + streamConfigName + "' for partition " + partitionId);
+ logger.error("Wait for 0.5 second...");
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("", e);
}
- input.close();
- } catch (IOException err) {
- logger.error("Error happens during build streaming '" + resourceName + "'", err);
- throw new RuntimeException(err);
+ streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamConfigName);
+ retry++;
}
+ if (retry >= 10) {
+ logger.error("No segment information in StreamingConfig '" + streamConfigName + "' for partition " + partitionId);
+ logger.warn("Abor building...");
+ return null;
+ }
+
+ String startEnd = streamingConfig.getPartitions().get(partitionId);
+ long start = Long.parseLong(startEnd.substring(0, startEnd.indexOf("_")));
+ long end = Long.parseLong(startEnd.substring(startEnd.indexOf("_") + 1));
+ StreamingBuildRequest request = new StreamingBuildRequest();
+ request.setStreaming(streamConfigName);
+ request.setStart(start);
+ request.setEnd(end);
+ return request;
+
}
- @Transition(to = "STANDBY", from = "LEADER")
- public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
- String resourceName = message.getResourceId().stringify();
- StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
- KylinConfigBase.getKylinHome();
- String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
- String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId;
+ private void runCMD(String cmd) {
logger.info("Executing: " + cmd);
try {
String line;
@@ -90,9 +129,10 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler {
}
input.close();
} catch (IOException err) {
- logger.error("Error happens during build streaming '" + resourceName + "'", err);
+ logger.error("Error happens when running '" + cmd + "'", err);
throw new RuntimeException(err);
}
+
}
@Transition(to = "STANDBY", from = "OFFLINE")
@@ -104,4 +144,17 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler {
public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
}
+
+ @Transition(to = "DROPPED", from = "OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ throws Exception {
+ logger.info("Default OFFLINE->DROPPED transition invoked.");
+ }
+
+ @Transition(to = "OFFLINE", from = "DROPPED")
+ public void onBecomeOfflineFromDropped(Message message, NotificationContext context)
+ throws Exception {
+ logger.info("Default DROPPED->OFFLINE transition invoked.");
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
index dcf91fd..201568e 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
@@ -81,16 +81,9 @@ public class StreamingBuildRequest {
}
public String toResourceName() {
- return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming + "_" + start + "_" + end;
+ return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming;
}
-
- public static StreamingBuildRequest fromResourceName(String resourceName) {
- Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
- long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
- String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
- long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
- String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
-
- return new StreamingBuildRequest(streamingConfig, start, end);
+ public String toPartitionName() {
+ return start + "_" + end;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index 7c2cc48..6e732d9 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -18,6 +18,8 @@
package org.apache.kylin.rest.service;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
@@ -98,20 +100,33 @@ public class StreamingService extends BasicService {
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
- public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) {
+ public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) throws IOException {
HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
- clusterAdmin.addStreamingJob(streamingBuildRequest);
+ try {
+ clusterAdmin.addStreamingJob(streamingBuildRequest);
+ } catch (IOException e) {
+ logger.error("", e);
+ streamingBuildRequest.setSuccessful(false);
+ streamingBuildRequest.setMessage("Failed to submit job for " + streamingBuildRequest.getStreaming());
+ }
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
- public void fillGap(CubeInstance cube) {
+ public List<Pair<Long, Long>> fillGap(CubeInstance cube) throws IOException {
HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfigByCube(cube.getName());
- final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
- logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
- for (Pair<Long, Long> gap : gaps) {
+ final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap());
+ logger.info("all gaps:" + StringUtils.join(gaps, ","));
+
+ List<Pair<Long, Long>> filledGap = Lists.newArrayList();
+ int max_gaps_at_one_time = streamingConfig.getMaxGapNumber();
+ for (int i = 0; i < Math.min(gaps.size(), max_gaps_at_one_time); i++) {
+ Pair<Long, Long> gap = gaps.get(i);
StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(streamingConfig.getName(), gap.getFirst(), gap.getSecond());
clusterAdmin.addStreamingJob(streamingBuildRequest);
+ filledGap.add(gap);
}
+
+ return filledGap;
}
}