You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/22 08:47:06 UTC

incubator-kylin git commit: KYLIN-774 Automatically merge cube segments

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 0a4570459 -> 20f78fb22


KYLIN-774 Automatically merge cube segments


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/20f78fb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/20f78fb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/20f78fb2

Branch: refs/heads/0.8.0
Commit: 20f78fb222844e1fc60fa6d1937012d85d7006cc
Parents: 0a45704
Author: shaofengshi <sh...@apache.org>
Authored: Fri May 22 14:43:21 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri May 22 14:45:56 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     | 17 +++++
 .../java/org/apache/kylin/cube/CubeManager.java | 66 ++++++++++++++++----
 .../org/apache/kylin/cube/CubeManagerTest.java  |  9 +++
 .../metadata/cube/kylin_sales_cube.json         |  3 +-
 .../metadata/model_desc/kylin_sales_model.json  |  2 +-
 ...st_kylin_cube_with_slr_ready_2_segments.json |  3 +-
 .../apache/kylin/rest/service/CacheService.java | 48 ++++++++++++++
 .../apache/kylin/rest/service/JobService.java   |  4 +-
 8 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index c397f98..d744bdf 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -80,6 +80,9 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
     @JsonProperty("create_time_utc")
     private long createTimeUTC;
 
+    @JsonProperty("auto_merge_time_ranges")
+    private long[] autoMergeTimeRanges;
+
     private String projectName;
 
     private static final int COST_WEIGHT_DIMENSION = 1;
@@ -401,4 +404,18 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
     public List<TblColRef> getAllDimensions() {
         return Lists.newArrayList(getDescriptor().listDimensionColumnsIncludingDerived());
     }
+
+    public long[] getAutoMergeTimeRanges() {
+        return autoMergeTimeRanges;
+    }
+
+    public boolean needAutoMerge() {
+        if (!this.getDescriptor().getModel().getPartitionDesc().isPartitioned())
+            return false;
+
+        if (this.getDescriptor().hasHolisticCountDistinctMeasures())
+            return false;
+
+        return autoMergeTimeRanges != null && autoMergeTimeRanges.length > 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 581c769..76a6c6d 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -22,13 +22,13 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
@@ -366,16 +366,6 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    public void updateSegmentOnJobDiscard(CubeInstance cubeInstance, String segmentName) throws IOException {
-        for (int i = 0; i < cubeInstance.getSegments().size(); i++) {
-            CubeSegment segment = cubeInstance.getSegments().get(i);
-            if (segment.getName().equals(segmentName) && segment.getStatus() != SegmentStatusEnum.READY) {
-                cubeInstance.getSegments().remove(segment);
-            }
-        }
-        updateCube(cubeInstance);
-    }
-
     /**
      * After cube update, reload cube related cache
      *
@@ -436,6 +426,60 @@ public class CubeManager implements IRealizationProvider {
         removeCubeCache(droppedCube);
     }
 
+    public CubeSegment autoMergeCubeSegments(CubeInstance cube) throws IOException {
+        if (!cube.needAutoMerge()) {
+            logger.debug("Cube " + cube.getName() + " doesn't need auto merge");
+            return null;
+        }
+
+        if (cube.getBuildingSegments().size() > 0) {
+            logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
+            return null;
+        }
+
+        List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegment(SegmentStatusEnum.READY));
+
+        if (readySegments.size() == 0) {
+            logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
+            return null;
+        }
+
+        long[] timeRanges = cube.getAutoMergeTimeRanges();
+        Arrays.sort(timeRanges);
+
+        CubeSegment newSeg = null;
+        for (int i = timeRanges.length - 1; i >= 0; i--) {
+            long toMergeRange = timeRanges[i];
+            long currentRange = 0;
+            List<CubeSegment> toMergeSegments = Lists.newArrayList();
+            for (CubeSegment segment : readySegments) {
+                long thisSegmentRange = segment.getDateRangeEnd() - segment.getDateRangeStart();
+
+                if (thisSegmentRange >= toMergeRange) {
+                    // this segment and its previous segments will not be merged
+                    toMergeSegments.clear();
+                    currentRange = 0;
+                    continue;
+                }
+
+                currentRange += thisSegmentRange;
+                if (currentRange < toMergeRange) {
+                    toMergeSegments.add(segment);
+                } else {
+                    // merge
+                    toMergeSegments.add(segment);
+
+                    newSeg = newSegment(cube, toMergeSegments.get(0).getDateRangeStart(), segment.getDateRangeEnd());
+                    // only one merge job be created here
+                    return newSeg;
+                }
+            }
+
+        }
+
+        return null;
+    }
+
     private CubeSegment newSegment(CubeInstance cubeInstance, long startDate, long endDate) {
         if (startDate >= endDate)
             throw new IllegalArgumentException("New segment range invalid, start date must be earlier than end date, " + startDate + " < " + endDate);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 5c4d824..8970295 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -83,6 +83,15 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         assertNull(CubeManager.getInstance(getTestConfig()).getCube("a_whole_new_cube"));
     }
 
+    @Test
+    public void testAutoMerge() throws Exception {
+        CubeManager cubeManager = CubeManager.getInstance(getTestConfig());
+        CubeInstance cube = cubeManager.getCube("test_kylin_cube_with_slr_ready_2_segments");
+        CubeSegment newSeg = cubeManager.autoMergeCubeSegments(cube);
+
+        assertNotNull(newSeg);
+    }
+
     public CubeDescManager getCubeDescManager() {
         return CubeDescManager.getInstance(getTestConfig());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/examples/sample_cube/metadata/cube/kylin_sales_cube.json
----------------------------------------------------------------------
diff --git a/examples/sample_cube/metadata/cube/kylin_sales_cube.json b/examples/sample_cube/metadata/cube/kylin_sales_cube.json
index d6ab149..8eabd60 100644
--- a/examples/sample_cube/metadata/cube/kylin_sales_cube.json
+++ b/examples/sample_cube/metadata/cube/kylin_sales_cube.json
@@ -6,5 +6,6 @@
   "version" : null,
   "descriptor" : "kylin_sales_cube_desc",
   "segments" : [ ],
-  "create_time" : null
+  "create_time" : null,
+  "auto_merge_time_ranges" : [604800000, 2419200000]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/examples/sample_cube/metadata/model_desc/kylin_sales_model.json
----------------------------------------------------------------------
diff --git a/examples/sample_cube/metadata/model_desc/kylin_sales_model.json b/examples/sample_cube/metadata/model_desc/kylin_sales_model.json
index 50eb822..586e73a 100644
--- a/examples/sample_cube/metadata/model_desc/kylin_sales_model.json
+++ b/examples/sample_cube/metadata/model_desc/kylin_sales_model.json
@@ -39,7 +39,7 @@
   "filter_condition" : null,
   "partition_desc" : {
     "partition_date_column" : "DEFAULT.KYLIN_SALES.PART_DT",
-    "partition_date_start" : 0,
+    "partition_date_start" : 1325376000000,
     "partition_type" : "APPEND"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_ready_2_segments.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_ready_2_segments.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_ready_2_segments.json
index 0700449..f4fb4a9 100644
--- a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_ready_2_segments.json
+++ b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_ready_2_segments.json
@@ -69,5 +69,6 @@
   } ],
   "status" : "READY",
   "create_time" : null,
-  "notify_list" : null
+  "notify_list" : null,
+  "auto_merge_time_ranges" : [2595000000]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 913ae35..6ea563f 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -18,15 +18,27 @@
 
 package org.apache.kylin.rest.service;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.job.cube.CubingJob;
+import org.apache.kylin.job.cube.CubingJobBuilder;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.controller.QueryController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Caching;
 import org.springframework.stereotype.Component;
@@ -39,6 +51,11 @@ import java.util.List;
 @Component("cacheService")
 public class CacheService extends BasicService {
 
+    private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
+
+    @Autowired
+    private JobService jobService;
+
     @Caching(evict = { @CacheEvict(value = QueryController.SUCCESS_QUERY_CACHE, allEntries = true), @CacheEvict(value = QueryController.EXCEPTION_QUERY_CACHE, allEntries = true) })
     public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
         final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey;
@@ -47,6 +64,7 @@ public class CacheService extends BasicService {
                 case CUBE:
                     getCubeManager().loadCubeCache(cacheKey);
                     cleanProjectCacheByRealization(RealizationType.CUBE, cacheKey);
+                    mergeCubeOnNewSegmentReady(cacheKey);
                     break;
                 case CUBE_DESC:
                     getCubeDescManager().reloadCubeDesc(cacheKey);
@@ -128,4 +146,34 @@ public class CacheService extends BasicService {
             throw new RuntimeException("error " + log, e);
         }
     }
+
+    private void mergeCubeOnNewSegmentReady(String cubeName) {
+
+        logger.debug("on mergeCubeOnNewSegmentReady: " + cubeName);
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        String serverMode = kylinConfig.getServerMode();
+
+        logger.debug("server mode: " + serverMode);
+        if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
+            logger.debug("This is the job engine node, will check whether auto merge is needed on cube " + cubeName);
+            CubeInstance cube = getCubeManager().getCube(cubeName);
+            CubeSegment newSeg = null;
+            try {
+                newSeg = getCubeManager().autoMergeCubeSegments(cube);
+                if (newSeg != null) {
+                    logger.debug("Will submit merge job on " + newSeg);
+                    CubingJobBuilder builder = new CubingJobBuilder(new JobEngineConfig(getConfig()));
+                    builder.setSubmitter("SYSTEM");
+                    newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd());
+                    CubingJob job = builder.mergeJob(newSeg);
+                    getExecutableManager().addJob(job);
+                } else {
+                    logger.debug("Not ready for merge on cube " + cubeName);
+                }
+
+            } catch (IOException e) {
+                logger.error("Failed to auto merge cube " + cubeName, e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/20f78fb2/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 000ba60..e370a27 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -57,7 +57,7 @@ import java.util.*;
 public class JobService extends BasicService {
 
     @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(CubeService.class);
+    private static final Logger logger = LoggerFactory.getLogger(JobService.class);
 
     public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue) throws IOException, JobException {
         Integer limit = (null == limitValue) ? 30 : limitValue;
@@ -262,7 +262,7 @@ public class JobService extends BasicService {
         final String segmentId = jobInstance.getRelatedSegment();
         CubeInstance cubeInstance = getCubeManager().getCube(jobInstance.getRelatedCube());
         final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
-        if (segment.getStatus() == SegmentStatusEnum.NEW) {
+        if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
             cubeInstance.getSegments().remove(segment);
             getCubeManager().updateCube(cubeInstance);
         }