You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/07 04:29:55 UTC

[38/50] [abbrv] kylin git commit: KYLIN-2245 slim Segments in CubeMananger

KYLIN-2245 slim Segments in CubeMananger

Signed-off-by: Yang Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a441c3f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a441c3f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a441c3f

Branch: refs/heads/master-hbase1.x
Commit: 0a441c3fa30aee0a06cfc6301f7fbfa412103179
Parents: 59a30f6
Author: Cheng Wang <ch...@kyligence.io>
Authored: Mon Dec 5 16:17:31 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Dec 5 20:20:30 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     |  14 ++
 .../java/org/apache/kylin/cube/CubeManager.java | 165 +--------------
 .../apache/kylin/metadata/model/ISegment.java   |   5 +-
 .../apache/kylin/metadata/model/Segments.java   | 206 ++++++++++++++++++-
 4 files changed, 221 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 8b12c2e..ecbb437 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
@@ -25,6 +26,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -361,6 +363,18 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0;
     }
 
+    public Pair<Long, Long> autoMergeCubeSegments() throws IOException {
+        return segments.autoMergeCubeSegments(needAutoMerge(), getName(), getDescriptor().getAutoMergeTimeRanges());
+    }
+
+    public Segments calculateToBeSegments(CubeSegment newSegment) {
+        return segments.calculateToBeSegments(newSegment, getModel().getPartitionDesc().isPartitioned());
+    }
+
+    public Pair<CubeSegment, CubeSegment> findMergeOffsetsByDateRange(Segments<CubeSegment> segs, long startDate, long endDate, long skipSegDateRangeCap) {
+        return this.segments.findMergeOffsetsByDateRange(segs, startDate, endDate, skipSegDateRangeCap);
+    }
+
     public CubeSegment getLastSegment() {
         List<CubeSegment> existing = getSegments();
         if (existing.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4ba29af..296a4e7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -520,7 +519,7 @@ public class CubeManager implements IRealizationProvider {
         if (isOffsetsOn) {
             // offset cube, merge by date range?
             if (startOffset == endOffset) {
-                Pair<CubeSegment, CubeSegment> pair = findMergeOffsetsByDateRange(cube.getSegments(SegmentStatusEnum.READY), startDate, endDate, Long.MAX_VALUE);
+                Pair<CubeSegment, CubeSegment> pair = cube.findMergeOffsetsByDateRange(cube.getSegments(SegmentStatusEnum.READY), startDate, endDate, Long.MAX_VALUE);
                 if (pair == null)
                     throw new IllegalArgumentException("Find no segments to merge by date range " + startDate + "-" + endDate + " for cube " + cube);
                 startOffset = pair.getFirst().getSourceOffsetStart();
@@ -580,32 +579,6 @@ public class CubeManager implements IRealizationProvider {
         return newSegment;
     }
 
-    private Pair<CubeSegment, CubeSegment> findMergeOffsetsByDateRange(List<CubeSegment> segments, long startDate, long endDate, long skipSegDateRangeCap) {
-        // must be offset cube
-        LinkedList<CubeSegment> result = Lists.newLinkedList();
-        for (CubeSegment seg : segments) {
-
-            // include if date range overlaps
-            if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < endDate) {
-
-                // reject too big segment
-                if (seg.getDateRangeEnd() - seg.getDateRangeStart() > skipSegDateRangeCap)
-                    break;
-
-                // reject holes
-                if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart())
-                    break;
-
-                result.add(seg);
-            }
-        }
-
-        if (result.size() <= 1)
-            return null;
-        else
-            return Pair.newPair(result.getFirst(), result.getLast());
-    }
-
     public static long minDateRangeStart(List<CubeSegment> mergingSegments) {
         long min = Long.MAX_VALUE;
         for (CubeSegment seg : mergingSegments)
@@ -708,50 +681,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public Pair<Long, Long> autoMergeCubeSegments(CubeInstance cube) throws IOException {
-        if (!cube.needAutoMerge()) {
-            logger.debug("Cube " + cube.getName() + " doesn't need auto merge");
-            return null;
-        }
-
-        List<CubeSegment> buildingSegs = cube.getBuildingSegments();
-        if (buildingSegs.size() > 0) {
-            logger.debug("Cube " + cube.getName() + " has " + buildingSegs.size() + " building segments");
-        }
-
-        List<CubeSegment> readySegs = cube.getSegments(SegmentStatusEnum.READY);
-
-        List<CubeSegment> mergingSegs = Lists.newArrayList();
-        if (buildingSegs.size() > 0) {
-
-            for (CubeSegment building : buildingSegs) {
-                // exclude those under-merging segs
-                for (CubeSegment ready : readySegs) {
-                    if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
-                        mergingSegs.add(ready);
-                    }
-                }
-            }
-        }
-
-        // exclude those already under merging segments
-        readySegs.removeAll(mergingSegs);
-
-        long[] timeRanges = cube.getDescriptor().getAutoMergeTimeRanges();
-        Arrays.sort(timeRanges);
-
-        for (int i = timeRanges.length - 1; i >= 0; i--) {
-            long toMergeRange = timeRanges[i];
-
-            for (int s = 0; s < readySegs.size(); s++) {
-                CubeSegment seg = readySegs.get(s);
-                Pair<CubeSegment, CubeSegment> p = findMergeOffsetsByDateRange(readySegs.subList(s, readySegs.size()), //
-                        seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
-                if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
-                    return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());
-            }
-        }
-
-        return null;
+        return cube.autoMergeCubeSegments();
     }
 
     public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
@@ -765,7 +695,7 @@ public class CubeManager implements IRealizationProvider {
             logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
         }
 
-        List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment);
+        List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
 
         if (tobe.contains(newSegment) == false)
             throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
@@ -786,104 +716,17 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
-        List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
+        List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
         List<CubeSegment> newList = Arrays.asList(newSegments);
         if (tobe.containsAll(newList) == false) {
             throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
         }
     }
 
-    /**
-     * Smartly figure out the TOBE segments once all new segments are built.
-     * - Ensures no gap, no overlap
-     * - Favors new segments over the old
-     * - Favors big segments over the small
-     */
-    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) {
-
-        List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
-        if (newSegments != null && !tobe.contains(newSegments)) {
-            tobe.add(newSegments);
-        }
-        if (tobe.size() == 0)
-            return tobe;
-
-        // sort by source offset
-        Collections.sort(tobe);
-
-        CubeSegment firstSeg = tobe.get(0);
-        firstSeg.validate();
-
-        for (int i = 0, j = 1; j < tobe.size();) {
-            CubeSegment is = tobe.get(i);
-            CubeSegment js = tobe.get(j);
-            js.validate();
-
-            // check i is either ready or new
-            if (!isNew(is) && !isReady(is)) {
-                tobe.remove(i);
-                continue;
-            }
-
-            // check j is either ready or new
-            if (!isNew(js) && !isReady(js)) {
-                tobe.remove(j);
-                continue;
-            }
-
-            if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) {
-                // if i, j competes
-                if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) {
-                    // if both new or ready, favor the bigger segment
-                    if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) {
-                        tobe.remove(i);
-                    } else {
-                        tobe.remove(j);
-                    }
-                    continue;
-                } else {
-                    // otherwise, favor the new segment
-                    if (isNew(is) && is.equals(newSegments)) {
-                        tobe.remove(j);
-                        continue;
-                    } else if (js.equals(newSegments)) {
-                        tobe.remove(i);
-                        continue;
-                    }
-                }
-            }
-
-            // if i, j in sequence
-            if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) {
-                i++;
-                j++;
-                continue;
-            }
-
-            // js can be covered by is
-            if (is.equals(newSegments)) {
-                // seems j not fitting
-                tobe.remove(j);
-                continue;
-            } else {
-                i++;
-                j++;
-                continue;
-            }
-
-        }
-
-        return tobe;
-    }
-
     private boolean isReady(CubeSegment seg) {
         return seg.getStatus() == SegmentStatusEnum.READY;
     }
 
-    private boolean isNew(CubeSegment seg) {
-        return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING;
-    }
-
     private void loadAllCubeInstance() throws IOException {
         ResourceStore store = getStore();
         List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json");

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
index e3fcdcb..9d26927 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
@@ -18,7 +18,7 @@
 
 package org.apache.kylin.metadata.model;
 
-public interface ISegment{
+public interface ISegment {
 
     public String getName();
 
@@ -29,11 +29,12 @@ public interface ISegment{
     public long getSourceOffsetStart();
 
     public long getSourceOffsetEnd();
-    
+
     public DataModelDesc getModel();
 
     public SegmentStatusEnum getStatus();
 
     public long getLastBuildTime();
 
+    public boolean isSourceOffsetsOn();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a441c3f/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
index f0a58cb..bc115cc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
@@ -18,16 +18,25 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Segments<T extends ISegment> extends ArrayList<T> {
-    
+
     private static final long serialVersionUID = 1L;
 
+    private static final Logger logger = LoggerFactory.getLogger(Segments.class);
+
     public static boolean sourceOffsetContains(ISegment a, ISegment b) {
         return a.getSourceOffsetStart() <= b.getSourceOffsetStart() && b.getSourceOffsetEnd() <= a.getSourceOffsetEnd();
     }
-    
+
     public static boolean sourceOffsetOverlaps(ISegment a, ISegment b) {
         return a.getSourceOffsetStart() < b.getSourceOffsetEnd() && b.getSourceOffsetStart() < a.getSourceOffsetEnd();
     }
@@ -44,7 +53,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
         Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
 
         long startTime = Long.MAX_VALUE;
-        for (T seg : readySegs) {
+        for (ISegment seg : readySegs) {
             startTime = Math.min(startTime, seg.getDateRangeStart());
         }
 
@@ -55,7 +64,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
         Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
 
         long endTime = Long.MIN_VALUE;
-        for (T seg : readySegs) {
+        for (ISegment seg : readySegs) {
             endTime = Math.max(endTime, seg.getDateRangeEnd());
         }
 
@@ -87,7 +96,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
         return latest;
     }
 
-    public Segments getSegments(SegmentStatusEnum status) {
+    public Segments<T> getSegments(SegmentStatusEnum status) {
         Segments<T> result = new Segments<>();
 
         for (T segment : this) {
@@ -107,7 +116,7 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
         return null;
     }
 
-    public Segments getBuildingSegments() {
+    public Segments<T> getBuildingSegments() {
         Segments<T> buildingSegments = new Segments();
         if (null != this) {
             for (T segment : this) {
@@ -142,9 +151,194 @@ public class Segments<T extends ISegment> extends ArrayList<T> {
         return result;
     }
 
+    public Pair<Long, Long> autoMergeCubeSegments(boolean needAutoMerge, String cubeName, long[] timeRanges) throws IOException {
+        if (!needAutoMerge) {
+            logger.debug("Cube " + cubeName + " doesn't need auto merge");
+            return null;
+        }
+
+        int buildingSize = getBuildingSegments().size();
+        if (buildingSize > 0) {
+            logger.debug("Cube " + cubeName + " has " + buildingSize + " building segments");
+        }
+
+        Segments<T> readySegs = getSegments(SegmentStatusEnum.READY);
+
+        Segments mergingSegs = new Segments();
+        if (buildingSize > 0) {
+
+            for (ISegment building : getBuildingSegments()) {
+                // exclude those under-merging segs
+                for (ISegment ready : readySegs) {
+                    if (ready.getSourceOffsetStart() >= building.getSourceOffsetStart() && ready.getSourceOffsetEnd() <= building.getSourceOffsetEnd()) {
+                        mergingSegs.add(ready);
+                    }
+                }
+            }
+        }
+
+        // exclude those already under merging segments
+        readySegs.removeAll(mergingSegs);
+
+        Arrays.sort(timeRanges);
+
+        for (int i = timeRanges.length - 1; i >= 0; i--) {
+            long toMergeRange = timeRanges[i];
+
+            for (int s = 0; s < readySegs.size(); s++) {
+                ISegment seg = readySegs.get(s);
+                Pair<T, T> p = findMergeOffsetsByDateRange(readySegs.getSubList(s, readySegs.size()), //
+                        seg.getDateRangeStart(), seg.getDateRangeStart() + toMergeRange, toMergeRange);
+                if (p != null && p.getSecond().getDateRangeEnd() - p.getFirst().getDateRangeStart() >= toMergeRange)
+                    return Pair.newPair(p.getFirst().getSourceOffsetStart(), p.getSecond().getSourceOffsetEnd());
+            }
+        }
+
+        return null;
+    }
+
+    public Pair<T, T> findMergeOffsetsByDateRange(Segments<T> segments, long startDate, long endDate, long skipSegDateRangeCap) {
+        // must be offset cube
+        Segments result = new Segments();
+        for (ISegment seg : segments) {
+
+            // include if date range overlaps
+            if (startDate < seg.getDateRangeEnd() && seg.getDateRangeStart() < endDate) {
+
+                // reject too big segment
+                if (seg.getDateRangeEnd() - seg.getDateRangeStart() > skipSegDateRangeCap)
+                    break;
+
+                // reject holes
+                if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart())
+                    break;
+
+                result.add(seg);
+            }
+        }
+
+        if (result.size() <= 1)
+            return null;
+        else
+            return (Pair<T, T>) Pair.newPair(result.getFirst(), result.getLast());
+    }
+
+    /**
+     * Smartly figure out the TOBE segments once all new segments are built.
+     * - Ensures no gap, no overlap
+     * - Favors new segments over the old
+     * - Favors big segments over the small
+     */
+    public Segments calculateToBeSegments(ISegment newSegment, boolean isPartitioned) {
+
+        Segments tobe = (Segments) this.clone();
+        if (newSegment != null && !tobe.contains(newSegment)) {
+            tobe.add(newSegment);
+        }
+        if (tobe.size() == 0)
+            return tobe;
+
+        // sort by source offset
+        Collections.sort(tobe);
+
+        ISegment firstSeg = tobe.getFirst();
+        validate(firstSeg, isPartitioned);
+
+        for (int i = 0, j = 1; j < tobe.size();) {
+            ISegment is = (ISegment) tobe.get(i);
+            ISegment js = (ISegment) tobe.get(j);
+            validate(js, isPartitioned);
+
+            // check i is either ready or new
+            if (!isNew(is) && !isReady(is)) {
+                tobe.remove(i);
+                continue;
+            }
+
+            // check j is either ready or new
+            if (!isNew(js) && !isReady(js)) {
+                tobe.remove(j);
+                continue;
+            }
+
+            if (is.getSourceOffsetStart() == js.getSourceOffsetStart()) {
+                // if i, j competes
+                if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) {
+                    // if both new or ready, favor the bigger segment
+                    if (is.getSourceOffsetEnd() <= js.getSourceOffsetEnd()) {
+                        tobe.remove(i);
+                    } else {
+                        tobe.remove(j);
+                    }
+                    continue;
+                } else {
+                    // otherwise, favor the new segment
+                    if (isNew(is) && is.equals(newSegment)) {
+                        tobe.remove(j);
+                        continue;
+                    } else if (js.equals(newSegment)) {
+                        tobe.remove(i);
+                        continue;
+                    }
+                }
+            }
+
+            // if i, j in sequence
+            if (is.getSourceOffsetEnd() <= js.getSourceOffsetStart()) {
+                i++;
+                j++;
+                continue;
+            }
+
+            // js can be covered by is
+            if (is.equals(newSegment)) {
+                // seems j not fitting
+                tobe.remove(j);
+                continue;
+            } else {
+                i++;
+                j++;
+                continue;
+            }
+
+        }
+
+        return tobe;
+    }
+
+    private void validate(ISegment seg, boolean isPartitioned) {
+        if (isPartitioned) {
+            if (!seg.isSourceOffsetsOn() && seg.getDateRangeStart() >= seg.getDateRangeEnd())
+                throw new IllegalStateException("Invalid segment, dateRangeStart(" + seg.getDateRangeStart() + ") must be smaller than dateRangeEnd(" + seg.getDateRangeEnd() + ") in segment " + seg);
+            if (seg.isSourceOffsetsOn() && seg.getSourceOffsetStart() >= seg.getSourceOffsetEnd())
+                throw new IllegalStateException("Invalid segment, sourceOffsetStart(" + seg.getSourceOffsetStart() + ") must be smaller than sourceOffsetEnd(" + seg.getSourceOffsetEnd() + ") in segment " + seg);
+        }
+    }
+
+    private boolean isReady(ISegment seg) {
+        return seg.getStatus() == SegmentStatusEnum.READY;
+    }
+
+    private boolean isNew(ISegment seg) {
+        return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING;
+    }
+
     private T getLast() {
         assert this.size() != 0;
         return this.get(this.size() - 1);
     }
 
+    private T getFirst() {
+        assert this.size() != 0;
+        return this.get(0);
+    }
+
+    private Segments<T> getSubList(int from, int to) {
+        Segments<T> result = new Segments<>();
+        for (T seg : this.subList(from, to)) {
+            result.add(seg);
+        }
+        return result;
+    }
+
 }
\ No newline at end of file