You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/14 15:44:24 UTC
kylin git commit: KYLIN-1415 Cube parallel merge
Repository: kylin
Updated Branches:
refs/heads/helix-201602 68a31eb77 -> 8aef3cab0
KYLIN-1415 Cube parallel merge
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8aef3cab
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8aef3cab
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8aef3cab
Branch: refs/heads/helix-201602
Commit: 8aef3cab02bc670dbabb7d1418b8e3a8a394f8fc
Parents: 68a31eb
Author: shaofengshi <sh...@apache.org>
Authored: Sun Feb 14 22:11:59 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Feb 14 22:11:59 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 54 ++++++++++++--------
.../apache/kylin/rest/service/CubeService.java | 2 +-
2 files changed, 33 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/8aef3cab/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 84dd30a..aac2701 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -18,13 +18,11 @@
package org.apache.kylin.cube;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -47,21 +45,17 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.metadata.realization.IRealizationProvider;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.metadata.realization.*;
import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
* @author yangli9
@@ -458,8 +452,14 @@ public class CubeManager implements IRealizationProvider {
}
public CubeSegment mergeSegments(CubeInstance cube, final long startDate, final long endDate, boolean forceMergeEmptySeg) throws IOException {
- checkNoBuildingSegment(cube);
+ return mergeSegments(cube, startDate, endDate, forceMergeEmptySeg, true);
+ }
+
+ public CubeSegment mergeSegments(CubeInstance cube, final long startDate, final long endDate, boolean forceMergeEmptySeg, boolean strictCheck) throws IOException {
checkCubeIsPartitioned(cube);
+
+ if (strictCheck)
+ checkNoBuildingSegment(cube);
Pair<Long, Long> range = alignMergeRange(cube, startDate, endDate);
CubeSegment newSegment = newSegment(cube, range.getFirst(), range.getSecond());
@@ -619,13 +619,23 @@ public class CubeManager implements IRealizationProvider {
return null;
}
- if (cube.getBuildingSegments().size() > 0) {
- logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger merge at this moment");
+ List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY));
+
+ if (readySegments.size() == 0) {
+ logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
return null;
}
+ List<CubeSegment> buildingSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.NEW));
+ List<CubeSegment> toSkipSegments = Lists.newArrayList();
+ for (CubeSegment building : buildingSegments) {
+ for (CubeSegment ready : readySegments) {
+ if (ready.getDateRangeStart() >= building.getDateRangeStart() && ready.getDateRangeEnd() <= building.getDateRangeEnd()) {
+ toSkipSegments.add(ready);
+ }
+ }
+ }
- List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY));
-
+ readySegments.removeAll(toSkipSegments);
if (readySegments.size() == 0) {
logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
return null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/8aef3cab/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 066f0c7..552ab4a 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -633,7 +633,7 @@ public class CubeService extends BasicService {
cube = getCubeManager().getCube(cubeName);
CubeSegment newSeg = getCubeManager().autoMergeCubeSegments(cube);
if (newSeg != null) {
- newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd(), true);
+ newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd(), true, false);
logger.debug("Will submit merge job on " + newSeg);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(newSeg, "SYSTEM");
getExecutableManager().addJob(job);