You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/07/27 05:37:22 UTC

incubator-kylin git commit: KYLIN-895, KYLIN-774 back port cube “auto_merge” and “retention” feature to 0.7

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.7-staging e3d42d3ca -> fc88b5062


KYLIN-895, KYLIN-774 back port cube “auto_merge” and “retention” feature to 0.7

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fc88b506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fc88b506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fc88b506

Branch: refs/heads/0.7-staging
Commit: fc88b5062737283140716c1133fc570d774aca0d
Parents: e3d42d3
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jul 27 11:16:45 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jul 27 11:16:45 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     | 31 ++++++++
 .../java/org/apache/kylin/cube/CubeManager.java | 66 ++++++++++++++++-
 .../metadata/cube/kylin_sales_cube.json         |  2 +
 .../apache/kylin/rest/service/CacheService.java |  5 ++
 .../apache/kylin/rest/service/CubeService.java  | 75 ++++++++++++++++++++
 5 files changed, 176 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 88f110f..9179df6 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -82,6 +82,12 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
     @JsonProperty("create_time_utc")
     private long createTimeUTC;
 
+    @JsonProperty("auto_merge_time_ranges")
+    private long[] autoMergeTimeRanges;
+
+    @JsonProperty("retention_range")
+    private long retentionRange = 0;
+    
     private String projectName;
     
     public List<CubeSegment> getBuildingSegments() {
@@ -348,6 +354,31 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
         this.projectName = projectName;
     }
 
+    public long[] getAutoMergeTimeRanges() {
+        return autoMergeTimeRanges;
+    }
+
+    public void setAutoMergeTimeRanges(long[] autoMergeTimeRanges) {
+        this.autoMergeTimeRanges = autoMergeTimeRanges;
+    }
+
+    public boolean needAutoMerge() {
+        if (!this.getDescriptor().getModel().getPartitionDesc().isPartitioned())
+            return false;
+
+        if (this.getDescriptor().hasHolisticCountDistinctMeasures())
+            return false;
+
+        return autoMergeTimeRanges != null && autoMergeTimeRanges.length > 0;
+    }
+
+    public long getRetentionRange() {
+        return retentionRange;
+    }
+
+    public void setRetentionRange(long retentionRange) {
+        this.retentionRange = retentionRange;
+    }
 
     @Override
     public long getDateRangeStart() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index d702153..075e6ac 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -545,9 +545,6 @@ public class CubeManager implements IRealizationProvider {
 
         // check first segment start time
         CubeSegment firstSeg = tobe.get(0);
-        if (firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
-            throw new IllegalStateException("For " + cube + ", the first segment, " + firstSeg + ", must start at " + partDesc.getPartitionDateStart());
-        }
         firstSeg.validate();
 
         for (int i = 0, j = 1; j < tobe.size(); ) {
@@ -649,6 +646,69 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    public CubeSegment autoMergeCubeSegments(CubeInstance cube) throws IOException {
+        if (!cube.needAutoMerge()) {
+            logger.debug("Cube " + cube.getName() + " doesn't need auto merge");
+            return null;
+        }
+
+        if (cube.getBuildingSegments().size() > 0) {
+            logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
+            return null;
+        }
+
+        List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegment(SegmentStatusEnum.READY));
+
+        if (readySegments.size() == 0) {
+            logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
+            return null;
+        }
+
+        long[] timeRanges = cube.getAutoMergeTimeRanges();
+        Arrays.sort(timeRanges);
+
+        CubeSegment newSeg = null;
+        for (int i = timeRanges.length - 1; i >= 0; i--) {
+            long toMergeRange = timeRanges[i];
+            long currentRange = 0;
+            long lastEndTime = 0;
+            List<CubeSegment> toMergeSegments = Lists.newArrayList();
+            for (CubeSegment segment : readySegments) {
+                long thisSegmentRange = segment.getDateRangeEnd() - segment.getDateRangeStart();
+
+                if (thisSegmentRange >= toMergeRange) {
+                    // this segment and its previous segments will not be merged
+                    toMergeSegments.clear();
+                    currentRange = 0;
+                    lastEndTime = segment.getDateRangeEnd();
+                    continue;
+                }
+
+                if (segment.getDateRangeStart() != lastEndTime && toMergeSegments.isEmpty() == false) {
+                    // gap exists, give up the small segments before the gap;
+                    toMergeSegments.clear();
+                    currentRange = 0;
+                }
+
+                currentRange += thisSegmentRange;
+                if (currentRange < toMergeRange) {
+                    toMergeSegments.add(segment);
+                    lastEndTime = segment.getDateRangeEnd();
+                } else {
+                    // merge
+                    toMergeSegments.add(segment);
+
+                    newSeg = newSegment(cube, toMergeSegments.get(0).getDateRangeStart(), segment.getDateRangeEnd());
+                    // only one merge job be created here
+                    return newSeg;
+                }
+            }
+
+        }
+
+        return null;
+    }
+
     private MetadataManager getMetadataManager() {
         return MetadataManager.getInstance(config);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/examples/sample_cube/metadata/cube/kylin_sales_cube.json
----------------------------------------------------------------------
diff --git a/examples/sample_cube/metadata/cube/kylin_sales_cube.json b/examples/sample_cube/metadata/cube/kylin_sales_cube.json
index d6ab149..ef1fdf6 100644
--- a/examples/sample_cube/metadata/cube/kylin_sales_cube.json
+++ b/examples/sample_cube/metadata/cube/kylin_sales_cube.json
@@ -6,5 +6,7 @@
   "version" : null,
   "descriptor" : "kylin_sales_cube_desc",
   "segments" : [ ],
+  "auto_merge_time_ranges" : [604800000, 2419200000],
+  "retention_range": 63072000000,
   "create_time" : null
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 3aa3385..c7a5d86 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -30,6 +30,7 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.storage.hybrid.HybridManager;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
@@ -41,12 +42,16 @@ import java.util.List;
 @Component("cacheService")
 public class CacheService extends BasicService {
 
+    @Autowired
+    private CubeService cubeService;
+    
     public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
         final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
             case CUBE:
                 getCubeManager().loadCubeCache(cacheKey);
+                cubeService.updateOnNewSegmentReady(cacheKey);
                 getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
                 cleanProjectCacheByRealization(RealizationType.CUBE, cacheKey);
                 break;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fc88b506/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 2c7c628..44a1f89 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +34,8 @@ import org.apache.kylin.cube.cuboid.CuboidCLI;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.cube.CubingJob;
+import org.apache.kylin.job.cube.CubingJobBuilder;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -559,5 +562,77 @@ public class CubeService extends BasicService {
     }
 
 
+    public void updateOnNewSegmentReady(String cubeName) {
+        logger.debug("on updateOnNewSegmentReady: " + cubeName);
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        String serverMode = kylinConfig.getServerMode();
+        logger.debug("server mode: " + serverMode);
+        if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
+            keepCubeRetention(cubeName);
+            mergeCubeSegment(cubeName);
+        }
+
+    }
+
+    private void keepCubeRetention(String cubeName) {
+        CubeInstance cube = getCubeManager().getCube(cubeName);
+        if (cube.getRetentionRange() > 0) {
+            synchronized (CubeService.class) {
+                cube = getCubeManager().getCube(cubeName);
+                List<CubeSegment> readySegs = cube.getSegment(SegmentStatusEnum.READY);
+                long currentRange = 0;
+                int position = readySegs.size() - 1;
+                while (position >= 0) {
+                    currentRange += (readySegs.get(position).getDateRangeEnd() - readySegs.get(position).getDateRangeStart());
+                    if (currentRange >= cube.getRetentionRange()) {
+                        break;
+                    }
+
+                    position--;
+                }
+
+                List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+                for (int i = 0; i < position; i++) {
+                    toRemoveSegs.add(readySegs.get(i));
+                }
+
+                if (toRemoveSegs.size() > 0) {
+                    cube.getSegments().removeAll(toRemoveSegs);
+                    try {
+                        this.getCubeManager().updateCube(cube);
+                    } catch (IOException e) {
+                        logger.error("Failed to remove old segment from cube " + cubeName, e);
+                    }
+
+                }
+            }
+        }
+    }
+
+    private void mergeCubeSegment(String cubeName) {
+        CubeInstance cube = getCubeManager().getCube(cubeName);
+        if (cube.needAutoMerge()) {
+            synchronized (CubeService.class) {
+                try {
+                    cube = getCubeManager().getCube(cubeName);
+                    CubeSegment newSeg = getCubeManager().autoMergeCubeSegments(cube);
+                    if (newSeg != null) {
+                        newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd(), true);
+                        logger.debug("Will submit merge job on " + newSeg);
+                        CubingJobBuilder builder = new CubingJobBuilder(new JobEngineConfig(getConfig()));
+                        builder.setSubmitter("SYSTEM");
+                        CubingJob job = builder.mergeJob(newSeg);
+                        getExecutableManager().addJob(job);
+                    } else {
+                        logger.debug("Not ready for merge on cube " + cubeName);
+                    }
+
+                } catch (IOException e) {
+                    logger.error("Failed to auto merge cube " + cubeName, e);
+                }
+            }
+        }
+
+    }
 
 }