You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/07/27 05:37:22 UTC
incubator-kylin git commit: KYLIN-895, KYLIN-774 back port cube “auto_merge” and “retention” feature to 0.7
Repository: incubator-kylin
Updated Branches:
refs/heads/0.7-staging e3d42d3ca -> fc88b5062
KYLIN-895, KYLIN-774 back port cube “auto_merge” and “retention” feature to 0.7
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fc88b506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fc88b506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fc88b506
Branch: refs/heads/0.7-staging
Commit: fc88b5062737283140716c1133fc570d774aca0d
Parents: e3d42d3
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jul 27 11:16:45 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jul 27 11:16:45 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 31 ++++++++
.../java/org/apache/kylin/cube/CubeManager.java | 66 ++++++++++++++++-
.../metadata/cube/kylin_sales_cube.json | 2 +
.../apache/kylin/rest/service/CacheService.java | 5 ++
.../apache/kylin/rest/service/CubeService.java | 75 ++++++++++++++++++++
5 files changed, 176 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 88f110f..9179df6 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -82,6 +82,12 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
@JsonProperty("create_time_utc")
private long createTimeUTC;
+ @JsonProperty("auto_merge_time_ranges")
+ private long[] autoMergeTimeRanges;
+
+ @JsonProperty("retention_range")
+ private long retentionRange = 0;
+
private String projectName;
public List<CubeSegment> getBuildingSegments() {
@@ -348,6 +354,31 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
this.projectName = projectName;
}
+ public long[] getAutoMergeTimeRanges() {
+ return autoMergeTimeRanges;
+ }
+
+ public void setAutoMergeTimeRanges(long[] autoMergeTimeRanges) {
+ this.autoMergeTimeRanges = autoMergeTimeRanges;
+ }
+
+ public boolean needAutoMerge() {
+ if (!this.getDescriptor().getModel().getPartitionDesc().isPartitioned())
+ return false;
+
+ if (this.getDescriptor().hasHolisticCountDistinctMeasures())
+ return false;
+
+ return autoMergeTimeRanges != null && autoMergeTimeRanges.length > 0;
+ }
+
+ public long getRetentionRange() {
+ return retentionRange;
+ }
+
+ public void setRetentionRange(long retentionRange) {
+ this.retentionRange = retentionRange;
+ }
@Override
public long getDateRangeStart() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index d702153..075e6ac 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -545,9 +545,6 @@ public class CubeManager implements IRealizationProvider {
// check first segment start time
CubeSegment firstSeg = tobe.get(0);
- if (firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
- throw new IllegalStateException("For " + cube + ", the first segment, " + firstSeg + ", must start at " + partDesc.getPartitionDateStart());
- }
firstSeg.validate();
for (int i = 0, j = 1; j < tobe.size(); ) {
@@ -649,6 +646,69 @@ public class CubeManager implements IRealizationProvider {
}
}
+ public CubeSegment autoMergeCubeSegments(CubeInstance cube) throws IOException {
+ if (!cube.needAutoMerge()) {
+ logger.debug("Cube " + cube.getName() + " doesn't need auto merge");
+ return null;
+ }
+
+ if (cube.getBuildingSegments().size() > 0) {
+ logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
+ return null;
+ }
+
+ List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegment(SegmentStatusEnum.READY));
+
+ if (readySegments.size() == 0) {
+ logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
+ return null;
+ }
+
+ long[] timeRanges = cube.getAutoMergeTimeRanges();
+ Arrays.sort(timeRanges);
+
+ CubeSegment newSeg = null;
+ for (int i = timeRanges.length - 1; i >= 0; i--) {
+ long toMergeRange = timeRanges[i];
+ long currentRange = 0;
+ long lastEndTime = 0;
+ List<CubeSegment> toMergeSegments = Lists.newArrayList();
+ for (CubeSegment segment : readySegments) {
+ long thisSegmentRange = segment.getDateRangeEnd() - segment.getDateRangeStart();
+
+ if (thisSegmentRange >= toMergeRange) {
+ // this segment and its previous segments will not be merged
+ toMergeSegments.clear();
+ currentRange = 0;
+ lastEndTime = segment.getDateRangeEnd();
+ continue;
+ }
+
+ if (segment.getDateRangeStart() != lastEndTime && toMergeSegments.isEmpty() == false) {
+ // gap exists, give up the small segments before the gap;
+ toMergeSegments.clear();
+ currentRange = 0;
+ }
+
+ currentRange += thisSegmentRange;
+ if (currentRange < toMergeRange) {
+ toMergeSegments.add(segment);
+ lastEndTime = segment.getDateRangeEnd();
+ } else {
+ // merge
+ toMergeSegments.add(segment);
+
+ newSeg = newSegment(cube, toMergeSegments.get(0).getDateRangeStart(), segment.getDateRangeEnd());
+ // only one merge job be created here
+ return newSeg;
+ }
+ }
+
+ }
+
+ return null;
+ }
+
private MetadataManager getMetadataManager() {
return MetadataManager.getInstance(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/examples/sample_cube/metadata/cube/kylin_sales_cube.json
----------------------------------------------------------------------
diff --git a/examples/sample_cube/metadata/cube/kylin_sales_cube.json b/examples/sample_cube/metadata/cube/kylin_sales_cube.json
index d6ab149..ef1fdf6 100644
--- a/examples/sample_cube/metadata/cube/kylin_sales_cube.json
+++ b/examples/sample_cube/metadata/cube/kylin_sales_cube.json
@@ -6,5 +6,7 @@
"version" : null,
"descriptor" : "kylin_sales_cube_desc",
"segments" : [ ],
+ "auto_merge_time_ranges" : [604800000, 2419200000],
+ "retention_range": 63072000000,
"create_time" : null
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 3aa3385..c7a5d86 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -30,6 +30,7 @@ import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.RealizationRegistry;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.storage.hybrid.HybridManager;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@@ -41,12 +42,16 @@ import java.util.List;
@Component("cacheService")
public class CacheService extends BasicService {
+ @Autowired
+ private CubeService cubeService;
+
public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey;
try {
switch (cacheType) {
case CUBE:
getCubeManager().loadCubeCache(cacheKey);
+ cubeService.updateOnNewSegmentReady(cacheKey);
getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
cleanProjectCacheByRealization(RealizationType.CUBE, cacheKey);
break;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 2c7c628..44a1f89 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.service;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +34,8 @@ import org.apache.kylin.cube.cuboid.CuboidCLI;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.cube.CubingJob;
+import org.apache.kylin.job.cube.CubingJobBuilder;
+import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
@@ -559,5 +562,77 @@ public class CubeService extends BasicService {
}
+ public void updateOnNewSegmentReady(String cubeName) {
+ logger.debug("on updateOnNewSegmentReady: " + cubeName);
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ String serverMode = kylinConfig.getServerMode();
+ logger.debug("server mode: " + serverMode);
+ if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
+ keepCubeRetention(cubeName);
+ mergeCubeSegment(cubeName);
+ }
+
+ }
+
+ private void keepCubeRetention(String cubeName) {
+ CubeInstance cube = getCubeManager().getCube(cubeName);
+ if (cube.getRetentionRange() > 0) {
+ synchronized (CubeService.class) {
+ cube = getCubeManager().getCube(cubeName);
+ List<CubeSegment> readySegs = cube.getSegment(SegmentStatusEnum.READY);
+ long currentRange = 0;
+ int position = readySegs.size() - 1;
+ while (position >= 0) {
+ currentRange += (readySegs.get(position).getDateRangeEnd() - readySegs.get(position).getDateRangeStart());
+ if (currentRange >= cube.getRetentionRange()) {
+ break;
+ }
+
+ position--;
+ }
+
+ List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+ for (int i = 0; i < position; i++) {
+ toRemoveSegs.add(readySegs.get(i));
+ }
+
+ if (toRemoveSegs.size() > 0) {
+ cube.getSegments().removeAll(toRemoveSegs);
+ try {
+ this.getCubeManager().updateCube(cube);
+ } catch (IOException e) {
+ logger.error("Failed to remove old segment from cube " + cubeName, e);
+ }
+
+ }
+ }
+ }
+ }
+
+ private void mergeCubeSegment(String cubeName) {
+ CubeInstance cube = getCubeManager().getCube(cubeName);
+ if (cube.needAutoMerge()) {
+ synchronized (CubeService.class) {
+ try {
+ cube = getCubeManager().getCube(cubeName);
+ CubeSegment newSeg = getCubeManager().autoMergeCubeSegments(cube);
+ if (newSeg != null) {
+ newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd(), true);
+ logger.debug("Will submit merge job on " + newSeg);
+ CubingJobBuilder builder = new CubingJobBuilder(new JobEngineConfig(getConfig()));
+ builder.setSubmitter("SYSTEM");
+ CubingJob job = builder.mergeJob(newSeg);
+ getExecutableManager().addJob(job);
+ } else {
+ logger.debug("Not ready for merge on cube " + cubeName);
+ }
+
+ } catch (IOException e) {
+ logger.error("Failed to auto merge cube " + cubeName, e);
+ }
+ }
+ }
+
+ }
}