You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/08 09:56:26 UTC

kylin git commit: KYLIN-2067 add API for check and fill holes

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2067 [created] 72425d4ec


KYLIN-2067 add API for check and fill holes

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/72425d4e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/72425d4e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/72425d4e

Branch: refs/heads/KYLIN-2067
Commit: 72425d4ec89ac81dbd6ca2007542753de32a562d
Parents: 4c636b8
Author: shaofengshi <sh...@apache.org>
Authored: Sat Oct 8 17:45:13 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Oct 8 17:45:13 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 139 +++++++++++++++++--
 .../java/org/apache/kylin/cube/CubeSegment.java |  24 ++++
 .../org/apache/kylin/cube/CubeManagerTest.java  |  49 +++++--
 .../org/apache/kylin/cube/CubeSegmentsTest.java |  12 +-
 .../streaming/cube/StreamingCubeBuilder.java    |   2 +-
 .../kylin/provision/BuildCubeWithEngine.java    |   2 +-
 .../kylin/provision/BuildCubeWithStream.java    |   7 +-
 .../kylin/rest/controller/CubeController.java   | 130 +++++++++--------
 .../kylin/rest/request/JobBuildRequest2.java    |  44 ++++--
 .../apache/kylin/rest/service/JobService.java   |   4 +-
 .../rest/controller/CubeControllerTest.java     |  29 +++-
 .../kylin/rest/service/CacheServiceTest.java    |   2 +-
 .../kylin/source/kafka/MergeOffsetStep.java     |  33 ++---
 .../kylin/source/kafka/SeekOffsetStep.java      |  13 +-
 .../source/kafka/hadoop/KafkaFlatTableJob.java  |   5 +-
 .../source/kafka/util/KafkaOffsetMapping.java   |  97 -------------
 16 files changed, 352 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index d243f4d..269b324 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -34,6 +34,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -433,26 +434,45 @@ public class CubeManager implements IRealizationProvider {
 
     // append a full build segment
     public CubeSegment appendSegment(CubeInstance cube) throws IOException {
-        return appendSegment(cube, 0, 0, 0, 0);
+        return appendSegment(cube, 0, 0, 0, 0, null, null);
     }
 
-    public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
+    public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate) throws IOException {
+        return appendSegment(cube, startDate, endDate, 0, 0, null, null);
+    }
 
+    public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException {
         checkBuildingSegment(cube);
 
+        if (sourcePartitionOffsetStart == null) {
+            sourcePartitionOffsetStart = Maps.newHashMap();
+        }
+        if (sourcePartitionOffsetEnd == null) {
+            sourcePartitionOffsetEnd = Maps.newHashMap();
+        }
+
+        boolean isOffsetsOn = endOffset != 0;
+        if (isOffsetsOn == true) {
+            checkSourceOffsets(startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+        }
+
         if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
             // try figure out a reasonable start if missing
             if (startDate == 0 && startOffset == 0) {
-                boolean isOffsetsOn = endOffset != 0;
-                if (isOffsetsOn) {
-                    startOffset = calculateStartOffsetForAppendSegment(cube);
-                    if (startOffset == Long.MAX_VALUE) {
-                        throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
+                final CubeSegment last = getLatestSegment(cube);
+                if (last != null) {
+                    if (isOffsetsOn) {
+                        if (last.getSourceOffsetEnd() == Long.MAX_VALUE) {
+                            throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
+                        }
+                        startOffset = last.getSourceOffsetEnd();
+                        sourcePartitionOffsetStart = last.getSourcePartitionOffsetEnd();
+                    } else {
+                        startDate = last.getDateRangeEnd();
                     }
-                } else {
-                    startDate = calculateStartDateForAppendSegment(cube);
                 }
             }
+
         } else {
             startDate = 0;
             endDate = Long.MAX_VALUE;
@@ -461,6 +481,8 @@ public class CubeManager implements IRealizationProvider {
         }
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
+        newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
+        newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
         validateNewSegments(cube, newSegment);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -491,9 +513,8 @@ public class CubeManager implements IRealizationProvider {
                 throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time");
             }
 
-            Map<String, String> partitionInfo = Maps.newHashMap();
-            partitionInfo.putAll(toRefreshSeg.getAdditionalInfo());
-            newSegment.setAdditionalInfo(partitionInfo);
+            newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
+            newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
         }
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -548,6 +569,8 @@ public class CubeManager implements IRealizationProvider {
             newSegment.setDateRangeEnd(maxDateRangeEnd(mergingSegments));
             newSegment.setSourceOffsetStart(first.getSourceOffsetStart());
             newSegment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+            newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+            newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
         } else {
             newSegment.setDateRangeStart(first.getSourceOffsetStart());
             newSegment.setDateRangeEnd(last.getSourceOffsetEnd());
@@ -601,20 +624,29 @@ public class CubeManager implements IRealizationProvider {
             return Pair.newPair(result.getFirst(), result.getLast());
     }
 
-    private long minDateRangeStart(List<CubeSegment> mergingSegments) {
+    public static long minDateRangeStart(List<CubeSegment> mergingSegments) {
         long min = Long.MAX_VALUE;
         for (CubeSegment seg : mergingSegments)
             min = Math.min(min, seg.getDateRangeStart());
         return min;
     }
 
-    private long maxDateRangeEnd(List<CubeSegment> mergingSegments) {
+    public static long maxDateRangeEnd(List<CubeSegment> mergingSegments) {
         long max = Long.MIN_VALUE;
         for (CubeSegment seg : mergingSegments)
             max = Math.max(max, seg.getDateRangeEnd());
         return max;
     }
 
+    private CubeSegment getLatestSegment(CubeInstance cube) {
+        List<CubeSegment> existing = cube.getSegments();
+        if (existing.isEmpty()) {
+            return null;
+        } else {
+            return existing.get(existing.size() - 1);
+        }
+    }
+
     private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
         List<CubeSegment> existing = cube.getSegments();
         if (existing.isEmpty()) {
@@ -640,6 +672,45 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    private void checkSourceOffsets(long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) {
+        if (endOffset <= 0)
+            return;
+
+        if (startOffset >= endOffset) {
+            throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
+        }
+
+        if (startOffset > 0) {
+            if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
+                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
+            }
+
+            long totalOffset = 0;
+            for (Long v : sourcePartitionOffsetStart.values()) {
+                totalOffset += v;
+            }
+
+            if (totalOffset != startOffset) {
+                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+            }
+        }
+
+        if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
+            if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
+                throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+            }
+
+            long totalOffset = 0;
+            for (Long v : sourcePartitionOffsetEnd.values()) {
+                totalOffset += v;
+            }
+
+            if (totalOffset != endOffset) {
+                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+            }
+        }
+    }
+
     private void checkCubeIsPartitioned(CubeInstance cube) {
         if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
             throw new IllegalStateException("there is no partition date column specified, only full build is supported");
@@ -1006,4 +1077,44 @@ public class CubeManager implements IRealizationProvider {
         }
         return factDictCols;
     }
+
+    /**
+     * Calculate the holes (gaps) in segments.
+     * @param cubeName
+     * @return
+     */
+    public List<CubeSegment> calculateHoles(String cubeName) {
+        List<CubeSegment> holes = Lists.newArrayList();
+        final CubeInstance cube = getCube(cubeName);
+        Preconditions.checkNotNull(cube);
+        final List<CubeSegment> segments = cube.getSegments();
+        logger.info("totally " + segments.size() + " cubeSegments");
+        if (segments.size() == 0) {
+            return holes;
+        }
+
+        Collections.sort(segments);
+        boolean isOffsetOn = segments.get(0).isSourceOffsetsOn();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getSourceOffsetEnd() == second.getSourceOffsetStart()) {
+                continue;
+            } else if (first.getSourceOffsetEnd() < second.getSourceOffsetStart()) {
+                CubeSegment hole = new CubeSegment();
+                if (isOffsetOn == true) {
+                    hole.setSourceOffsetStart(first.getSourceOffsetEnd());
+                    hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
+                    hole.setSourceOffsetEnd(second.getSourceOffsetStart());
+                    hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
+                } else {
+                    hole.setDateRangeStart(first.getDateRangeEnd());
+                    hole.setDateRangeEnd(second.getDateRangeStart());
+                }
+                hole.setName(CubeSegment.makeSegmentName(hole.getDateRangeStart(), hole.getDateRangeEnd(), hole.getSourceOffsetStart(), hole.getSourceOffsetEnd()));
+                holes.add(hole);
+            }
+        }
+        return holes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d5de47f..fdf1fb0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -103,6 +103,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen
     @JsonProperty("rowkey_stats")
     private List<Object[]> rowkeyStats = Lists.newArrayList();
 
+    @JsonProperty("source_partition_offset_start")
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private Map<Integer, Long> sourcePartitionOffsetStart = Maps.newHashMap();
+
+    @JsonProperty("source_partition_offset_end")
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private Map<Integer, Long> sourcePartitionOffsetEnd = Maps.newHashMap();
+
     @JsonProperty("additionalInfo")
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
@@ -549,4 +557,20 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen
     public void setAdditionalInfo(Map<String, String> additionalInfo) {
         this.additionalInfo = additionalInfo;
     }
+
+    public Map<Integer, Long> getSourcePartitionOffsetEnd() {
+        return sourcePartitionOffsetEnd;
+    }
+
+    public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) {
+        this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
+    }
+
+    public Map<Integer, Long> getSourcePartitionOffsetStart() {
+        return sourcePartitionOffsetStart;
+    }
+
+    public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
+        this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index e63fe99..bb90d29 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.JsonUtil;
@@ -106,10 +108,10 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0, null, null);
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0);
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0, null, null);
         seg2.setStatus(SegmentStatusEnum.READY);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -133,11 +135,21 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         // no segment at first
         assertEquals(0, cube.getSegments().size());
 
+        Map m1 =  Maps.newHashMap();
+        m1.put(1, 1000l);
+        Map m2 =  Maps.newHashMap();
+        m2.put(1, 2000l);
+        Map m3 =  Maps.newHashMap();
+        m3.put(1, 3000l);
+        Map m4 =  Maps.newHashMap();
+        m4.put(1, 4000l);
+
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1);
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2);
         seg2.setStatus(SegmentStatusEnum.READY);
 
 
@@ -145,12 +157,13 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         seg3.setStatus(SegmentStatusEnum.NEW);
 
 
-        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3);
         seg4.setStatus(SegmentStatusEnum.NEW);
         seg4.setLastBuildJobID("test");
         seg4.setStorageLocationIdentifier("test");
 
-        CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+
+        CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4);
         seg5.setStatus(SegmentStatusEnum.READY);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -179,18 +192,26 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
 
         // no segment at first
         assertEquals(0, cube.getSegments().size());
+        Map m1 =  Maps.newHashMap();
+        m1.put(1, 1000l);
+        Map m2 =  Maps.newHashMap();
+        m2.put(1, 2000l);
+        Map m3 =  Maps.newHashMap();
+        m3.put(1, 3000l);
+        Map m4 =  Maps.newHashMap();
+        m4.put(1, 4000l);
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1);
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2);
         seg2.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+        CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3);
         seg3.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4);
         seg4.setStatus(SegmentStatusEnum.READY);
 
 
@@ -246,10 +267,10 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg3 = mgr.appendSegment(cube, 2000, 4000, 0, 0);
+        CubeSegment seg3 = mgr.appendSegment(cube, 2000, 4000);
         seg3.setStatus(SegmentStatusEnum.READY);
 
         assertEquals(2, cube.getSegments().size());
@@ -260,7 +281,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
 
         // append a new seg which will be merged
 
-        CubeSegment seg4 = mgr.appendSegment(cube, 4000, 8000, 0, 0);
+        CubeSegment seg4 = mgr.appendSegment(cube, 4000, 8000);
         seg4.setStatus(SegmentStatusEnum.READY);
 
         assertEquals(3, cube.getSegments().size());
@@ -272,7 +293,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
 
         // fill the gap
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0);
+        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
         seg2.setStatus(SegmentStatusEnum.READY);
 
         assertEquals(4, cube.getSegments().size());

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index f70aef9..828a3a9 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -106,11 +106,11 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
         seg1.setStatus(SegmentStatusEnum.READY);
 
         // append second
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0);
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000);
 
         assertEquals(2, cube.getSegments().size());
         assertEquals(1000, seg2.getDateRangeStart());
@@ -164,25 +164,25 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append the first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
         seg1.setStatus(SegmentStatusEnum.READY);
         assertEquals(1, cube.getSegments().size());
 
         // append the third
-        CubeSegment seg3 = mgr.appendSegment(cube, 2000, 3000, 0, 0);
+        CubeSegment seg3 = mgr.appendSegment(cube, 2000, 3000);
         seg3.setStatus(SegmentStatusEnum.READY);
         assertEquals(2, cube.getSegments().size());
 
         // reject overlap
         try {
-            mgr.appendSegment(cube, 1000, 2500, 0, 0);
+            mgr.appendSegment(cube, 1000, 2500);
             fail();
         } catch (IllegalStateException ex) {
             // good
         }
 
         // append the second
-        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0);
+        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
         seg2.setStatus(SegmentStatusEnum.READY);
         assertEquals(3, cube.getSegments().size());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 07a4cfb..350a5f8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -104,7 +104,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
         CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         try {
-            CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0);
+            CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
             segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
             segment.setInputRecords(streamingBatch.getMessages().size());
             segment.setLastBuildTime(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 971b293..f6c8801 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -404,7 +404,7 @@ public class BuildCubeWithEngine {
     }
 
     private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
-        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, endDate, 0, 0);
+        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, endDate);
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index f8805a6..c30abc0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Lists;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.util.ToolRunner;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
@@ -205,7 +204,7 @@ public class BuildCubeWithStream {
         for (int i = 0; i < futures.size(); i++) {
             ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
             logger.info("Checking building task " + i + " whose state is " + result);
-            Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED );
+            Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED);
             if (result == ExecutableState.SUCCEED)
                 succeedBuild++;
         }
@@ -214,7 +213,6 @@ public class BuildCubeWithStream {
         List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
         Assert.assertTrue(segments.size() == succeedBuild);
 
-
         if (fastBuildMode == false) {
             //empty build
             ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
@@ -238,7 +236,6 @@ public class BuildCubeWithStream {
         logger.info("Build is done");
     }
 
-
     private ExecutableState mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
         CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
         DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
@@ -256,7 +253,7 @@ public class BuildCubeWithStream {
     }
 
     protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, null, null);
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 42b117c..be242c3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
@@ -36,7 +37,6 @@ import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dimension.DimensionEncodingFactory;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -54,9 +54,6 @@ import org.apache.kylin.rest.response.GeneralResponse;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.JobService;
-import org.apache.kylin.rest.service.KafkaConfigService;
-import org.apache.kylin.rest.service.StreamingService;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,12 +83,6 @@ public class CubeController extends BasicController {
     private static final Logger logger = LoggerFactory.getLogger(CubeController.class);
 
     @Autowired
-    private StreamingService streamingService;
-
-    @Autowired
-    private KafkaConfigService kafkaConfigService;
-
-    @Autowired
     private CubeService cubeService;
 
     @Autowired
@@ -272,7 +263,7 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
-        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, null, null, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
     }
 
     /** Build/Rebuild a cube segment by source offset */
@@ -286,16 +277,16 @@ public class CubeController extends BasicController {
     @RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
-        return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
+        return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), req.getSourceOffsetEnd(), req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(),  req.getBuildType(), req.isForce());
     }
 
     private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
-            long startOffset, long endOffset, String buildType, boolean force) {
+            long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force) {
         try {
             String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
             CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
             return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
-                    CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+                    sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, submitter);
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage());
@@ -542,6 +533,73 @@ public class CubeController extends BasicController {
         return hbase;
     }
 
+    /**
+     * get cube segment holes
+     *
+     * @return true
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.GET })
+    @ResponseBody
+    public List<CubeSegment> getHoles(@PathVariable String cubeName) {
+        return cubeService.getCubeManager().calculateHoles(cubeName);
+    }
+
+    /**
+     * get cube segment holes
+     *
+     * @return true
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.PUT })
+    @ResponseBody
+    public List<JobInstance> fillHoles(@PathVariable String cubeName) {
+        List<JobInstance> jobs = Lists.newArrayList();
+        List<CubeSegment> holes = cubeService.getCubeManager().calculateHoles(cubeName);
+
+        if (holes.size() == 0) {
+            logger.info("No hole detected for cube '" + cubeName + "'");
+            return jobs;
+        }
+
+        boolean isOffsetOn = holes.get(0).isSourceOffsetsOn();
+        for (CubeSegment hole : holes) {
+            if (isOffsetOn == true) {
+                JobBuildRequest2 request = new JobBuildRequest2();
+                request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
+                request.setSourceOffsetStart(hole.getSourceOffsetStart());
+                request.setSourcePartitionOffsetStart(hole.getSourcePartitionOffsetStart());
+                request.setSourceOffsetEnd(hole.getSourceOffsetEnd());
+                request.setSourcePartitionOffsetEnd(hole.getSourcePartitionOffsetEnd());
+                try {
+                    JobInstance job = build(cubeName, request);
+                    jobs.add(job);
+                } catch (Exception e) {
+                    // it may exceed the max allowed job number
+                    logger.info("Error to submit job for hole '" + hole.toString() + "', skip it now.", e);
+                    continue;
+                }
+            } else {
+                JobBuildRequest request = new JobBuildRequest();
+                request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
+                request.setStartTime(hole.getDateRangeStart());
+                request.setEndTime(hole.getDateRangeEnd());
+
+                try {
+                    JobInstance job = build(cubeName, request);
+                    jobs.add(job);
+                } catch (Exception e) {
+                    // it may exceed the max allowed job number
+                    logger.info("Error to submit job for hole '" + hole.toString() + "', skip it now.", e);
+                    continue;
+                }
+            }
+        }
+
+        return jobs;
+
+    }
+
     private CubeDesc deserializeCubeDesc(CubeRequest cubeRequest) {
         CubeDesc desc = null;
         try {
@@ -560,42 +618,6 @@ public class CubeController extends BasicController {
         return desc;
     }
 
-    private StreamingConfig deserializeStreamingDesc(CubeRequest cubeRequest) {
-        StreamingConfig desc = null;
-        try {
-            logger.debug("Saving StreamingConfig " + cubeRequest.getStreamingData());
-            desc = JsonUtil.readValue(cubeRequest.getStreamingData(), StreamingConfig.class);
-        } catch (JsonParseException e) {
-            logger.error("The StreamingConfig definition is not valid.", e);
-            updateRequest(cubeRequest, false, e.getMessage());
-        } catch (JsonMappingException e) {
-            logger.error("The data StreamingConfig definition is not valid.", e);
-            updateRequest(cubeRequest, false, e.getMessage());
-        } catch (IOException e) {
-            logger.error("Failed to deal with the request.", e);
-            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
-        }
-        return desc;
-    }
-
-    private KafkaConfig deserializeKafkaDesc(CubeRequest cubeRequest) {
-        KafkaConfig desc = null;
-        try {
-            logger.debug("Saving KafkaConfig " + cubeRequest.getKafkaData());
-            desc = JsonUtil.readValue(cubeRequest.getKafkaData(), KafkaConfig.class);
-        } catch (JsonParseException e) {
-            logger.error("The KafkaConfig definition is not valid.", e);
-            updateRequest(cubeRequest, false, e.getMessage());
-        } catch (JsonMappingException e) {
-            logger.error("The data KafkaConfig definition is not valid.", e);
-            updateRequest(cubeRequest, false, e.getMessage());
-        } catch (IOException e) {
-            logger.error("Failed to deal with the request.", e);
-            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
-        }
-        return desc;
-    }
-
     private void updateRequest(CubeRequest request, boolean success, String message) {
         request.setCubeDescData("");
         request.setSuccessful(success);
@@ -610,12 +632,4 @@ public class CubeController extends BasicController {
         this.jobService = jobService;
     }
 
-    public void setStreamingService(StreamingService streamingService) {
-        this.streamingService = streamingService;
-    }
-
-    public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
-        this.kafkaConfigService = kafkaConfigService;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
index dc3b433..6e9117d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
@@ -18,30 +18,54 @@
 
 package org.apache.kylin.rest.request;
 
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
 public class JobBuildRequest2 {
 
-    private long startSourceOffset;
+    private long sourceOffsetStart;
+
+    private long sourceOffsetEnd;
 
-    private long endSourceOffset;
+    private Map<Integer, Long> sourcePartitionOffsetStart = Maps.newHashMap();
+
+    private Map<Integer, Long> sourcePartitionOffsetEnd = Maps.newHashMap();
 
     private String buildType;
 
     private boolean force;
 
-    public long getStartSourceOffset() {
-        return startSourceOffset;
+    public long getSourceOffsetStart() {
+        return sourceOffsetStart;
+    }
+
+    public void setSourceOffsetStart(long sourceOffsetStart) {
+        this.sourceOffsetStart = sourceOffsetStart;
+    }
+
+    public long getSourceOffsetEnd() {
+        return sourceOffsetEnd;
+    }
+
+    public void setSourceOffsetEnd(long sourceOffsetEnd) {
+        this.sourceOffsetEnd = sourceOffsetEnd;
+    }
+
+    public Map<Integer, Long> getSourcePartitionOffsetStart() {
+        return sourcePartitionOffsetStart;
     }
 
-    public void setStartSourceOffset(long startSourceOffset) {
-        this.startSourceOffset = startSourceOffset;
+    public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
+        this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
     }
 
-    public long getEndSourceOffset() {
-        return endSourceOffset;
+    public Map<Integer, Long> getSourcePartitionOffsetEnd() {
+        return sourcePartitionOffsetEnd;
     }
 
-    public void setEndSourceOffset(long endSourceOffset) {
-        this.endSourceOffset = endSourceOffset;
+    public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) {
+        this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
     }
 
     public String getBuildType() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 5c704ba..589f67d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -199,7 +199,7 @@ public class JobService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
     public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
-            CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
+            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
 
         if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
             throw new BadRequestException("Broken cube " + cube.getName() + " can't be built");
@@ -211,7 +211,7 @@ public class JobService extends BasicService {
         DefaultChainedExecutable job;
 
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
+            CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
             job = EngineFactory.createBatchCubingJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
index 6319899..d192a7d 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
@@ -23,7 +23,9 @@ import java.io.StringWriter;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.CubeRequest;
 import org.apache.kylin.rest.service.CubeService;
@@ -60,7 +62,6 @@ public class CubeControllerTest extends ServiceTestBase {
         cubeController = new CubeController();
         cubeController.setCubeService(cubeService);
         cubeController.setJobService(jobService);
-        cubeController.setStreamingService(streamingService);
 
         cubeDescController = new CubeDescController();
         cubeDescController.setCubeService(cubeService);
@@ -163,6 +164,32 @@ public class CubeControllerTest extends ServiceTestBase {
         Assert.assertTrue(segNumber == newSegNumber + 1);
     }
 
+
+    @Test
+    public void testGetHoles() throws IOException {
+        String cubeName = "test_kylin_cube_with_slr_ready_3_segments";
+        CubeDesc[] cubes = cubeDescController.getCube(cubeName);
+        Assert.assertNotNull(cubes);
+
+        CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+        List<CubeSegment> segments = cube.getSegments();
+
+        final long dateEnd = segments.get(segments.size() -1).getDateRangeEnd();
+
+        final long ONEDAY = 24 * 60 * 60000;
+        cubeService.getCubeManager().appendSegment(cube, dateEnd + ONEDAY, dateEnd + ONEDAY * 2);
+
+        List<CubeSegment> holes = cubeController.getHoles(cubeName);
+
+        Assert.assertTrue(holes.size() == 1);
+
+        CubeSegment hole = holes.get(0);
+
+        Assert.assertTrue(hole.getDateRangeStart() == dateEnd && hole.getDateRangeEnd() == (dateEnd + ONEDAY));
+
+    }
+
+
     @Test
     public void testGetCubes() {
         List<CubeInstance> cubes = cubeController.getCubes(null, null, null, 1, 0);

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index af9ccc0..a3ce5c5 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -234,7 +234,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         CubeInstance cube = cubeManager.getCube(cubeName);
         assertEquals(0, cube.getSegments().size());
         assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
-        CubeSegment segment = cubeManager.appendSegment(cube, 0, 1000, 0, 0);
+        CubeSegment segment = cubeManager.appendSegment(cube, 0, 1000);
         //one for cube update
         assertEquals(1, broadcaster.getCounterAndClear());
         waitForCounterAndClear(1);

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
index a21b980..18c959a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
@@ -18,10 +18,9 @@
 package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
-import com.google.common.collect.Maps;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -34,8 +33,6 @@ import org.apache.kylin.job.execution.ExecuteResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
-
 /**
  */
 public class MergeOffsetStep extends AbstractExecutable {
@@ -52,26 +49,20 @@ public class MergeOffsetStep extends AbstractExecutable {
         final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
         List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
-        Map<Integer, Long> mergedStartOffsets = Maps.newHashMap();
-        Map<Integer, Long> mergedEndOffsets = Maps.newHashMap();
 
-        long dateRangeStart = Long.MAX_VALUE, dateRangeEnd = 0;
-        for (CubeSegment seg: mergingSegs) {
-            Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(seg);
-            Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(seg);
+        Collections.sort(mergingSegs);
 
-            for (Integer partition : startOffsets.keySet()) {
-                long currentStart = mergedStartOffsets.get(partition) != null ? Long.valueOf(mergedStartOffsets.get(partition)) : Long.MAX_VALUE;
-                long currentEnd = mergedEndOffsets.get(partition) != null ? Long.valueOf(mergedEndOffsets.get(partition)) : 0;
-                mergedStartOffsets.put(partition, Math.min(currentStart, startOffsets.get(partition)));
-                mergedEndOffsets.put(partition, Math.max(currentEnd, endOffsets.get(partition)));
-            }
-            dateRangeStart = Math.min(dateRangeStart, seg.getDateRangeStart());
-            dateRangeEnd = Math.max(dateRangeEnd, seg.getDateRangeEnd());
-        }
+        final CubeSegment first = mergingSegs.get(0);
+        final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
+
+        segment.setSourceOffsetStart(first.getSourceOffsetStart());
+        segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+        segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+        segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+
+        long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
+        long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
 
-        KafkaOffsetMapping.saveOffsetStart(segment, mergedStartOffsets);
-        KafkaOffsetMapping.saveOffsetEnd(segment, mergedEndOffsets);
         segment.setDateRangeStart(dateRangeStart);
         segment.setDateRangeEnd(dateRangeEnd);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index e1282d6..151b912 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -18,7 +18,6 @@
 package org.apache.kylin.source.kafka;
 
 import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kylin.cube.CubeInstance;
@@ -54,8 +53,8 @@ public class SeekOffsetStep extends AbstractExecutable {
         final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
-        Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(segment);
-        Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(segment);
+        Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
+        Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
 
         if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
@@ -70,7 +69,7 @@ public class SeekOffsetStep extends AbstractExecutable {
             if (startOffsets.isEmpty()) {
                 // user didn't specify start offset, use the biggest offset in existing segments as start
                 for (CubeSegment seg : cube.getSegments()) {
-                    Map<Integer, Long> segEndOffset = KafkaOffsetMapping.parseOffsetEnd(seg);
+                    Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
                     for (PartitionInfo partition : partitionInfos) {
                         int partitionId = partition.partition();
                         if (segEndOffset.containsKey(partitionId)) {
@@ -110,8 +109,10 @@ public class SeekOffsetStep extends AbstractExecutable {
         }
 
         if (totalEndOffset > totalStartOffset) {
-            KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
-            KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
+            segment.setSourceOffsetStart(totalStartOffset);
+            segment.setSourceOffsetEnd(totalEndOffset);
+            segment.setSourcePartitionOffsetStart(startOffsets);
+            segment.setSourcePartitionOffsetEnd(endOffsets);
             segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
             CubeUpdate cubeBuilder = new CubeUpdate(cube);
             cubeBuilder.setToUpdateSegs(segment);

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 87d2471..e20b20a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.source.kafka.hadoop;
 
 import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -132,8 +131,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
 
     private void setupMapper(CubeSegment cubeSeg) throws IOException {
         // set the segment's offset info to job conf
-        Map<Integer, Long> offsetStart = KafkaOffsetMapping.parseOffsetStart(cubeSeg);
-        Map<Integer, Long> offsetEnd = KafkaOffsetMapping.parseOffsetEnd(cubeSeg);
+        Map<Integer, Long> offsetStart = cubeSeg.getSourcePartitionOffsetStart();
+        Map<Integer, Long> offsetEnd = cubeSeg.getSourcePartitionOffsetEnd();
 
         Integer minPartition = Collections.min(offsetStart.keySet());
         Integer maxPartition = Collections.max(offsetStart.keySet());

http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
deleted file mode 100644
index b46e57f..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka.util;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.cube.CubeSegment;
-
-import java.util.Map;
-
-/**
- */
-public class KafkaOffsetMapping {
-
-    public static final String OFFSET_START = "kafka.offset.start.";
-    public static final String OFFSET_END = "kafka.offset.end.";
-
-    /**
-     * Get the start offsets for each partition from a segment
-     *
-     * @param segment
-     * @return
-     */
-    public static Map<Integer, Long> parseOffsetStart(CubeSegment segment) {
-        return parseOffset(segment, OFFSET_START);
-    }
-
-    /**
-     * Get the end offsets for each partition from a segment
-     *
-     * @param segment
-     * @return
-     */
-    public static Map<Integer, Long> parseOffsetEnd(CubeSegment segment) {
-        return parseOffset(segment, OFFSET_END);
-    }
-
-    /**
-     * Save the partition start offset to cube segment
-     *
-     * @param segment
-     * @param offsetStart
-     */
-    public static void saveOffsetStart(CubeSegment segment, Map<Integer, Long> offsetStart) {
-        long sourceOffsetStart = 0;
-        for (Integer partition : offsetStart.keySet()) {
-            segment.getAdditionalInfo().put(OFFSET_START + partition, String.valueOf(offsetStart.get(partition)));
-            sourceOffsetStart += offsetStart.get(partition);
-        }
-
-        segment.setSourceOffsetStart(sourceOffsetStart);
-    }
-
-    /**
-     * Save the partition end offset to cube segment
-     *
-     * @param segment
-     * @param offsetEnd
-     */
-    public static void saveOffsetEnd(CubeSegment segment, Map<Integer, Long> offsetEnd) {
-        long sourceOffsetEnd = 0;
-        for (Integer partition : offsetEnd.keySet()) {
-            segment.getAdditionalInfo().put(OFFSET_END + partition, String.valueOf(offsetEnd.get(partition)));
-            sourceOffsetEnd += offsetEnd.get(partition);
-        }
-
-        segment.setSourceOffsetEnd(sourceOffsetEnd);
-    }
-
-    private static Map<Integer, Long> parseOffset(CubeSegment segment, String propertyPrefix) {
-        final Map<Integer, Long> offsetStartMap = Maps.newHashMap();
-        for (String key : segment.getAdditionalInfo().keySet()) {
-            if (key.startsWith(propertyPrefix)) {
-                Integer partition = Integer.valueOf(key.substring(propertyPrefix.length()));
-                Long offset = Long.valueOf(segment.getAdditionalInfo().get(key));
-                offsetStartMap.put(partition, offset);
-            }
-        }
-
-
-        return offsetStartMap;
-    }
-}