You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/07 04:29:55 UTC
[38/50] [abbrv] kylin git commit: KYLIN-2245 slim Segments in
CubeMananger
KYLIN-2245 slim Segments in CubeMananger
Signed-off-by: Yang Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a441c3f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a441c3f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a441c3f
Branch: refs/heads/master-hbase1.x
Commit: 0a441c3fa30aee0a06cfc6301f7fbfa412103179
Parents: 59a30f6
Author: Cheng Wang <ch...@kyligence.io>
Authored: Mon Dec 5 16:17:31 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Dec 5 20:20:30 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 14 ++
.../java/org/apache/kylin/cube/CubeManager.java | 165 +--------------
.../apache/kylin/metadata/model/ISegment.java | 5 +-
.../apache/kylin/metadata/model/Segments.java | 206 ++++++++++++++++++-
4 files changed, 221 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 8b12c2e..ecbb437 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -18,6 +18,7 @@
package org.apache.kylin.cube;
+import java.io.IOException;
import java.util.List;
import java.util.Set;
@@ -25,6 +26,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.DataModelDesc;
@@ -361,6 +363,18 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0;
}
+ public Pair<Long, Long> autoMergeCubeSegments() throws IOException {
+ return segments.autoMergeCubeSegments(needAutoMerge(), getName(), getDescriptor().getAutoMergeTimeRanges());
+ }
+
+ public Segments calculateToBeSegments(CubeSegment newSegment) {
+ return segments.calculateToBeSegments(newSegment, getModel().getPartitionDesc().isPartitioned());
+ }
+
+ public Pair<CubeSegment, CubeSegment> findMergeOffsetsByDateRange(Segments<CubeSegment> segs, long startDate, long endDate, long skipSegDateRangeCap) {
+ return this.segments.findMergeOffsetsByDateRange(segs, startDate, endDate, skipSegDateRangeCap);
+ }
+
public CubeSegment getLastSegment() {
List<CubeSegment> existing = getSegments();
if (existing.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4ba29af..296a4e7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -520,7 +519,7 @@ public class CubeManager implements IRealizationProvider {
if (isOffsetsOn) {
// offset cube, merge by date range?
if (startOffset == endOffset) {
- Pair<CubeSegment, CubeSegment> pair = findMergeOffsetsByDateRange(cube.getSegments(SegmentStatusEnum.READY), startDate, endDate, Long.MAX_VALUE);
+ Pair<CubeSegment, CubeSegment> pair = cube.findMergeOffsetsByDateRange(cube.getSegments(SegmentStatusEnum.READY), startDate, endDate, Long.MAX_VALUE);
if (pair == null)
throw new IllegalArgumentException("Find no segments to merge by date range " + startDate + "-" + endDate + " for cube " + cube);
startOffset = pair.getFirst().getSourceOffsetStart();
@@ -580,32 +579,6 @@ public class CubeManager implements IRealizationProvider {
return newSegment;
}
- private Pair<CubeSegment, CubeSegment> findMergeOffsetsByDateRange(List<CubeSegment> segments, long startDate, long endDate, long skipSegDateRangeCap) {
- // must be offset cube
- LinkedList<CubeSegment> result = Lists.newLinkedList();
- for (CubeSegment seg : segments) {
-
- // include if date range overlaps
- if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < endDate) {
-
- // reject too big segment
- if (seg.getDateRangeEnd() - seg.getDateRangeStart() > skipSegDateRangeCap)
- break;
-
- // reject holes
- if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart())
- break;
-
- result.add(seg);
- }
- }
-
- if (result.size() <= 1)
- return null;
- else
- return Pair.newPair(result.getFirst(), result.getLast());
- }
-
public static long minDateRangeStart(List<CubeSegment> mergingSegments) {
long min = Long.MAX_VALUE;
for (CubeSegment seg : mergingSegments)
@@ -708,50 +681,7 @@ public class CubeManager implements IRealizationProvider {
}
public Pair<Long, Long> autoMergeCubeSegments(CubeInstance cube) throws IOException {
- if (!cube.needAutoMerge()) {
- logger.debug("Cube " + cube.getName() + " doesn't need auto merge");
- return null;
- }
-
- List<CubeSegment> buildingSegs = cube.getBuildingSegments();
- if (buildingSegs.size() > 0) {
- logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments");
- }
-
- List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY);
-
- List<CubeSegment> mergingSegs = Lists.newArrayList();
- if (buildingSegs.size() > 0) {
-
- for (CubeSegment building : buildingSegs) {
- // exclude those under-merging segs
- for (CubeSegment ready : readySegs) {
- if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
- mergingSegs.add(ready);
- }
- }
- }
- }
-
- // exclude those already under merging segments
- readySegs.removeAll(mergingSegs);
-
- long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges();
- Arrays.sort(timeRanges);
-
- for (int i = timeRanges.length - 1; i >= 0; i--) {
- long toMergeRange = timeRanges[i];
-
- for (int s = 0; s < readySegs.size(); s++) {
- CubeSegment seg = readySegs.get(s);
- Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), //
- seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
- if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
- return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());
- }
- }
-
- return null;
+ return cube.autoMergeCubeSegments();
}
public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
@@ -765,7 +695,7 @@ public class CubeManager implements IRealizationProvider {
logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
}
- List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment);
+ List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
if (tobe.contains(newSegment) == false)
throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
@@ -786,104 +716,17 @@ public class CubeManager implements IRealizationProvider {
}
public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
- List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
+ List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
if (tobe.containsAll(newList) == false) {
throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
}
}
- /**
- * Smartly figure out the TOBE segments once all new segments are built.
- * - Ensures no gap, no overlap
- * - Favors new segments over the old
- * - Favors big segments over the small
- */
- private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) {
-
- List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
- if (newSegments != null && !tobe.contains(newSegments)) {
- tobe.add(newSegments);
- }
- if (tobe.size() == 0)
- return tobe;
-
- // sort by source offset
- Collections.sort(tobe);
-
- CubeSegment firstSeg = tobe.get(0);
- firstSeg.validate();
-
- for (int i = 0, j = 1; j < tobe.size();) {
- CubeSegment is = tobe.get(i);
- CubeSegment js = tobe.get(j);
- js.validate();
-
- // check i is either ready or new
- if (!isNew(is) && !isReady(is)) {
- tobe.remove(i);
- continue;
- }
-
- // check j is either ready or new
- if (!isNew(js) && !isReady(js)) {
- tobe.remove(j);
- continue;
- }
-
- if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) {
- // if i, j competes
- if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) {
- // if both new or ready, favor the bigger segment
- if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) {
- tobe.remove(i);
- } else {
- tobe.remove(j);
- }
- continue;
- } else {
- // otherwise, favor the new segment
- if (isNew(is) && is.equals(newSegments)) {
- tobe.remove(j);
- continue;
- } else if (js.equals(newSegments)) {
- tobe.remove(i);
- continue;
- }
- }
- }
-
- // if i, j in sequence
- if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) {
- i++;
- j++;
- continue;
- }
-
- // js can be covered by is
- if (is.equals(newSegments)) {
- // seems j not fitting
- tobe.remove(j);
- continue;
- } else {
- i++;
- j++;
- continue;
- }
-
- }
-
- return tobe;
- }
-
private boolean isReady(CubeSegment seg) {
return seg.getStatus() == SegmentStatusEnum.READY;
}
- private boolean isNew(CubeSegment seg) {
- return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING;
- }
-
private void loadAllCubeInstance() throws IOException {
ResourceStore store = getStore();
List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json");
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
index e3fcdcb..9d26927 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
@@ -18,7 +18,7 @@
package org.apache.kylin.metadata.model;
-public interface ISegment{
+public interface ISegment {
public String getName();
@@ -29,11 +29,12 @@ public interface ISegment{
public long getSourceOffsetStart();
public long getSourceOffsetEnd();
-
+
public DataModelDesc getModel();
public SegmentStatusEnum getStatus();
public long getLastBuildTime();
+ public boolean isSourceOffsetsOn();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
index f0a58cb..bc115cc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
@@ -18,16 +18,25 @@
package org.apache.kylin.metadata.model;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Segments<T extends ISegment> extends ArrayList<T> {
-
+
private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(Segments.class);
+
public static boolean sourceOffsetContains(ISegment a, ISegment b) {
return a.getSourceOffsetStart() <= b.getSourceOffsetStart() && b.getSourceOffsetEnd() <= a.getSourceOffsetEnd();
}
-
+
public static boolean sourceOffsetOverlaps(ISegment a, ISegment b) {
return a.getSourceOffsetStart() < b.getSourceOffsetEnd() && b.getSourceOffsetStart() < a.getSourceOffsetEnd();
}
@@ -44,7 +53,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
long startTime = Long.MAX_VALUE;
- for (T seg : readySegs) {
+ for (ISegment seg : readySegs) {
startTime = Math.min(startTime, seg.getDateRangeStart());
}
@@ -55,7 +64,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
long endTime = Long.MIN_VALUE;
- for (T seg : readySegs) {
+ for (ISegment seg : readySegs) {
endTime = Math.max(endTime, seg.getDateRangeEnd());
}
@@ -87,7 +96,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
return latest;
}
- public Segments getSegments(SegmentStatusEnum status) {
+ public Segments<T> getSegments(SegmentStatusEnum status) {
Segments<T> result = new Segments<>();
for (T segment : this) {
@@ -107,7 +116,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
return null;
}
- public Segments getBuildingSegments() {
+ public Segments<T> getBuildingSegments() {
Segments<T> buildingSegments = new Segments();
if (null != this) {
for (T segment : this) {
@@ -142,9 +151,194 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
return result;
}
+ public Pair<Long, Long> autoMergeCubeSegments(boolean needAutoMerge, String cubeName, long[] timeRanges) throws IOException {
+ if (!needAutoMerge) {
+ logger.debug("Cube " + cubeName + " doesn't need auto merge");
+ return null;
+ }
+
+ int buildingSize = getBuildingSegments().size();
+ if (buildingSize > 0) {
+ logger.debug("Cube " + cubeName + " has " + buildingSize + " building segments");
+ }
+
+ Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
+
+ Segments mergingSegs = new Segments();
+ if (buildingSize > 0) {
+
+ for (ISegment building : getBuildingSegments()) {
+ // exclude those under-merging segs
+ for (ISegment ready : readySegs) {
+ if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
+ mergingSegs.add(ready);
+ }
+ }
+ }
+ }
+
+ // exclude those already under merging segments
+ readySegs.removeAll(mergingSegs);
+
+ Arrays.sort(timeRanges);
+
+ for (int i = timeRanges.length - 1; i >= 0; i--) {
+ long toMergeRange = timeRanges[i];
+
+ for (int s = 0; s < readySegs.size(); s++) {
+ ISegment seg = readySegs.get(s);
+ Pair<T, T> p = findMergeOffsetsByDateRange(readySegs.getSubList(s, readySegs.size()), //
+ seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
+ if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
+ return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());
+ }
+ }
+
+ return null;
+ }
+
+ public Pair<T, T> findMergeOffsetsByDateRange(Segments<T> segments, long startDate, long endDate, long skipSegDateRangeCap) {
+ // must be offset cube
+ Segments result = new Segments();
+ for (ISegment seg : segments) {
+
+ // include if date range overlaps
+ if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < endDate) {
+
+ // reject too big segment
+ if (seg.getDateRangeEnd() - seg.getDateRangeStart() > skipSegDateRangeCap)
+ break;
+
+ // reject holes
+ if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart())
+ break;
+
+ result.add(seg);
+ }
+ }
+
+ if (result.size() <= 1)
+ return null;
+ else
+ return (Pair<T, T>) Pair.newPair(result.getFirst(), result.getLast());
+ }
+
+ /**
+ * Smartly figure out the TOBE segments once all new segments are built.
+ * - Ensures no gap, no overlap
+ * - Favors new segments over the old
+ * - Favors big segments over the small
+ */
+ public Segments calculateToBeSegments(ISegment newSegment, boolean isPartitioned) {
+
+ Segments tobe = (Segments) this.clone();
+ if (newSegment != null && !tobe.contains(newSegment)) {
+ tobe.add(newSegment);
+ }
+ if (tobe.size() == 0)
+ return tobe;
+
+ // sort by source offset
+ Collections.sort(tobe);
+
+ ISegment firstSeg = tobe.getFirst();
+ validate(firstSeg, isPartitioned);
+
+ for (int i = 0, j = 1; j < tobe.size();) {
+ ISegment is = (ISegment) tobe.get(i);
+ ISegment js = (ISegment) tobe.get(j);
+ validate(js, isPartitioned);
+
+ // check i is either ready or new
+ if (!isNew(is) && !isReady(is)) {
+ tobe.remove(i);
+ continue;
+ }
+
+ // check j is either ready or new
+ if (!isNew(js) && !isReady(js)) {
+ tobe.remove(j);
+ continue;
+ }
+
+ if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) {
+ // if i, j competes
+ if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) {
+ // if both new or ready, favor the bigger segment
+ if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) {
+ tobe.remove(i);
+ } else {
+ tobe.remove(j);
+ }
+ continue;
+ } else {
+ // otherwise, favor the new segment
+ if (isNew(is) && is.equals(newSegment)) {
+ tobe.remove(j);
+ continue;
+ } else if (js.equals(newSegment)) {
+ tobe.remove(i);
+ continue;
+ }
+ }
+ }
+
+ // if i, j in sequence
+ if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) {
+ i++;
+ j++;
+ continue;
+ }
+
+ // js can be covered by is
+ if (is.equals(newSegment)) {
+ // seems j not fitting
+ tobe.remove(j);
+ continue;
+ } else {
+ i++;
+ j++;
+ continue;
+ }
+
+ }
+
+ return tobe;
+ }
+
+ private void validate(ISegment seg, boolean isPartitioned) {
+ if (isPartitioned) {
+ if (!seg.isSourceOffsetsOn() && seg.getDateRangeStart() >= seg.getDateRangeEnd())
+ throw new IllegalStateException("Invalid segment, dateRangeStart(" + seg.getDateRangeStart() + ") must be smaller than dateRangeEnd(" + seg.getDateRangeEnd() + ") in segment " + seg);
+ if (seg.isSourceOffsetsOn() && seg.getSourceOffsetStart() >= seg.getSourceOffsetEnd())
+ throw new IllegalStateException("Invalid segment, sourceOffsetStart(" + seg.getSourceOffsetStart() + ") must be smaller than sourceOffsetEnd(" + seg.getSourceOffsetEnd() + ") in segment " + seg);
+ }
+ }
+
+ private boolean isReady(ISegment seg) {
+ return seg.getStatus() == SegmentStatusEnum.READY;
+ }
+
+ private boolean isNew(ISegment seg) {
+ return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING;
+ }
+
private T getLast() {
assert this.size() != 0;
return this.get(this.size() - 1);
}
+ private T getFirst() {
+ assert this.size() != 0;
+ return this.get(0);
+ }
+
+ private Segments<T> getSubList(int from, int to) {
+ Segments<T> result = new Segments<>();
+ for (T seg : this.subList(from, to)) {
+ result.add(seg);
+ }
+ return result;
+ }
+
}
\ No newline at end of file