You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/08 09:56:26 UTC
kylin git commit: KYLIN-2067 add API for check and fill holes
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2067 [created] 72425d4ec
KYLIN-2067 add API for check and fill holes
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/72425d4e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/72425d4e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/72425d4e
Branch: refs/heads/KYLIN-2067
Commit: 72425d4ec89ac81dbd6ca2007542753de32a562d
Parents: 4c636b8
Author: shaofengshi <sh...@apache.org>
Authored: Sat Oct 8 17:45:13 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Oct 8 17:45:13 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 139 +++++++++++++++++--
.../java/org/apache/kylin/cube/CubeSegment.java | 24 ++++
.../org/apache/kylin/cube/CubeManagerTest.java | 49 +++++--
.../org/apache/kylin/cube/CubeSegmentsTest.java | 12 +-
.../streaming/cube/StreamingCubeBuilder.java | 2 +-
.../kylin/provision/BuildCubeWithEngine.java | 2 +-
.../kylin/provision/BuildCubeWithStream.java | 7 +-
.../kylin/rest/controller/CubeController.java | 130 +++++++++--------
.../kylin/rest/request/JobBuildRequest2.java | 44 ++++--
.../apache/kylin/rest/service/JobService.java | 4 +-
.../rest/controller/CubeControllerTest.java | 29 +++-
.../kylin/rest/service/CacheServiceTest.java | 2 +-
.../kylin/source/kafka/MergeOffsetStep.java | 33 ++---
.../kylin/source/kafka/SeekOffsetStep.java | 13 +-
.../source/kafka/hadoop/KafkaFlatTableJob.java | 5 +-
.../source/kafka/util/KafkaOffsetMapping.java | 97 -------------
16 files changed, 352 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index d243f4d..269b324 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -34,6 +34,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -433,26 +434,45 @@ public class CubeManager implements IRealizationProvider {
// append a full build segment
public CubeSegment appendSegment(CubeInstance cube) throws IOException {
- return appendSegment(cube, 0, 0, 0, 0);
+ return appendSegment(cube, 0, 0, 0, 0, null, null);
}
- public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException {
+ public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate) throws IOException {
+ return appendSegment(cube, startDate, endDate, 0, 0, null, null);
+ }
+ public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException {
checkBuildingSegment(cube);
+ if (sourcePartitionOffsetStart == null) {
+ sourcePartitionOffsetStart = Maps.newHashMap();
+ }
+ if (sourcePartitionOffsetEnd == null) {
+ sourcePartitionOffsetEnd = Maps.newHashMap();
+ }
+
+ boolean isOffsetsOn = endOffset != 0;
+ if (isOffsetsOn == true) {
+ checkSourceOffsets(startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+ }
+
if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
// try figure out a reasonable start if missing
if (startDate == 0 && startOffset == 0) {
- boolean isOffsetsOn = endOffset != 0;
- if (isOffsetsOn) {
- startOffset = calculateStartOffsetForAppendSegment(cube);
- if (startOffset == Long.MAX_VALUE) {
- throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
+ final CubeSegment last = getLatestSegment(cube);
+ if (last != null) {
+ if (isOffsetsOn) {
+ if (last.getSourceOffsetEnd() == Long.MAX_VALUE) {
+ throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
+ }
+ startOffset = last.getSourceOffsetEnd();
+ sourcePartitionOffsetStart = last.getSourcePartitionOffsetEnd();
+ } else {
+ startDate = last.getDateRangeEnd();
}
- } else {
- startDate = calculateStartDateForAppendSegment(cube);
}
}
+
} else {
startDate = 0;
endDate = Long.MAX_VALUE;
@@ -461,6 +481,8 @@ public class CubeManager implements IRealizationProvider {
}
CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
+ newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
+ newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
validateNewSegments(cube, newSegment);
CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -491,9 +513,8 @@ public class CubeManager implements IRealizationProvider {
throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time");
}
- Map<String, String> partitionInfo = Maps.newHashMap();
- partitionInfo.putAll(toRefreshSeg.getAdditionalInfo());
- newSegment.setAdditionalInfo(partitionInfo);
+ newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
+ newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
}
CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -548,6 +569,8 @@ public class CubeManager implements IRealizationProvider {
newSegment.setDateRangeEnd(maxDateRangeEnd(mergingSegments));
newSegment.setSourceOffsetStart(first.getSourceOffsetStart());
newSegment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+ newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+ newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
} else {
newSegment.setDateRangeStart(first.getSourceOffsetStart());
newSegment.setDateRangeEnd(last.getSourceOffsetEnd());
@@ -601,20 +624,29 @@ public class CubeManager implements IRealizationProvider {
return Pair.newPair(result.getFirst(), result.getLast());
}
- private long minDateRangeStart(List<CubeSegment> mergingSegments) {
+ public static long minDateRangeStart(List<CubeSegment> mergingSegments) {
long min = Long.MAX_VALUE;
for (CubeSegment seg : mergingSegments)
min = Math.min(min, seg.getDateRangeStart());
return min;
}
- private long maxDateRangeEnd(List<CubeSegment> mergingSegments) {
+ public static long maxDateRangeEnd(List<CubeSegment> mergingSegments) {
long max = Long.MIN_VALUE;
for (CubeSegment seg : mergingSegments)
max = Math.max(max, seg.getDateRangeEnd());
return max;
}
+ private CubeSegment getLatestSegment(CubeInstance cube) {
+ List<CubeSegment> existing = cube.getSegments();
+ if (existing.isEmpty()) {
+ return null;
+ } else {
+ return existing.get(existing.size() - 1);
+ }
+ }
+
private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
List<CubeSegment> existing = cube.getSegments();
if (existing.isEmpty()) {
@@ -640,6 +672,45 @@ public class CubeManager implements IRealizationProvider {
}
}
+ private void checkSourceOffsets(long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) {
+ if (endOffset <= 0)
+ return;
+
+ if (startOffset >= endOffset) {
+ throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
+ }
+
+ if (startOffset > 0) {
+ if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
+ throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
+ }
+
+ long totalOffset = 0;
+ for (Long v : sourcePartitionOffsetStart.values()) {
+ totalOffset += v;
+ }
+
+ if (totalOffset != startOffset) {
+ throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+ }
+ }
+
+ if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
+ if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
+ throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+ }
+
+ long totalOffset = 0;
+ for (Long v : sourcePartitionOffsetEnd.values()) {
+ totalOffset += v;
+ }
+
+ if (totalOffset != endOffset) {
+ throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+ }
+ }
+ }
+
private void checkCubeIsPartitioned(CubeInstance cube) {
if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
throw new IllegalStateException("there is no partition date column specified, only full build is supported");
@@ -1006,4 +1077,44 @@ public class CubeManager implements IRealizationProvider {
}
return factDictCols;
}
+
+ /**
+ * Calculate the holes (gaps) in segments.
+ * @param cubeName
+ * @return
+ */
+ public List<CubeSegment> calculateHoles(String cubeName) {
+ List<CubeSegment> holes = Lists.newArrayList();
+ final CubeInstance cube = getCube(cubeName);
+ Preconditions.checkNotNull(cube);
+ final List<CubeSegment> segments = cube.getSegments();
+ logger.info("totally " + segments.size() + " cubeSegments");
+ if (segments.size() == 0) {
+ return holes;
+ }
+
+ Collections.sort(segments);
+ boolean isOffsetOn = segments.get(0).isSourceOffsetsOn();
+ for (int i = 0; i < segments.size() - 1; ++i) {
+ CubeSegment first = segments.get(i);
+ CubeSegment second = segments.get(i + 1);
+ if (first.getSourceOffsetEnd() == second.getSourceOffsetStart()) {
+ continue;
+ } else if (first.getSourceOffsetEnd() < second.getSourceOffsetStart()) {
+ CubeSegment hole = new CubeSegment();
+ if (isOffsetOn == true) {
+ hole.setSourceOffsetStart(first.getSourceOffsetEnd());
+ hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
+ hole.setSourceOffsetEnd(second.getSourceOffsetStart());
+ hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
+ } else {
+ hole.setDateRangeStart(first.getDateRangeEnd());
+ hole.setDateRangeEnd(second.getDateRangeStart());
+ }
+ hole.setName(CubeSegment.makeSegmentName(hole.getDateRangeStart(), hole.getDateRangeEnd(), hole.getSourceOffsetStart(), hole.getSourceOffsetEnd()));
+ holes.add(hole);
+ }
+ }
+ return holes;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d5de47f..fdf1fb0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -103,6 +103,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen
@JsonProperty("rowkey_stats")
private List<Object[]> rowkeyStats = Lists.newArrayList();
+ @JsonProperty("source_partition_offset_start")
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ private Map<Integer, Long> sourcePartitionOffsetStart = Maps.newHashMap();
+
+ @JsonProperty("source_partition_offset_end")
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ private Map<Integer, Long> sourcePartitionOffsetEnd = Maps.newHashMap();
+
@JsonProperty("additionalInfo")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
@@ -549,4 +557,20 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen
public void setAdditionalInfo(Map<String, String> additionalInfo) {
this.additionalInfo = additionalInfo;
}
+
+ public Map<Integer, Long> getSourcePartitionOffsetEnd() {
+ return sourcePartitionOffsetEnd;
+ }
+
+ public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) {
+ this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
+ }
+
+ public Map<Integer, Long> getSourcePartitionOffsetStart() {
+ return sourcePartitionOffsetStart;
+ }
+
+ public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
+ this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index e63fe99..bb90d29 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.List;
+import java.util.Map;
import java.util.NavigableSet;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.JsonUtil;
@@ -106,10 +108,10 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
assertEquals(0, cube.getSegments().size());
// append first
- CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0, null, null);
seg1.setStatus(SegmentStatusEnum.READY);
- CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0);
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0, null, null);
seg2.setStatus(SegmentStatusEnum.READY);
CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -133,11 +135,21 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
// no segment at first
assertEquals(0, cube.getSegments().size());
+ Map m1 = Maps.newHashMap();
+ m1.put(1, 1000l);
+ Map m2 = Maps.newHashMap();
+ m2.put(1, 2000l);
+ Map m3 = Maps.newHashMap();
+ m3.put(1, 3000l);
+ Map m4 = Maps.newHashMap();
+ m4.put(1, 4000l);
+
// append first
- CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1);
seg1.setStatus(SegmentStatusEnum.READY);
- CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2);
seg2.setStatus(SegmentStatusEnum.READY);
@@ -145,12 +157,13 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
seg3.setStatus(SegmentStatusEnum.NEW);
- CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+ CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3);
seg4.setStatus(SegmentStatusEnum.NEW);
seg4.setLastBuildJobID("test");
seg4.setStorageLocationIdentifier("test");
- CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+
+ CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4);
seg5.setStatus(SegmentStatusEnum.READY);
CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -179,18 +192,26 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
// no segment at first
assertEquals(0, cube.getSegments().size());
+ Map m1 = Maps.newHashMap();
+ m1.put(1, 1000l);
+ Map m2 = Maps.newHashMap();
+ m2.put(1, 2000l);
+ Map m3 = Maps.newHashMap();
+ m3.put(1, 3000l);
+ Map m4 = Maps.newHashMap();
+ m4.put(1, 4000l);
// append first
- CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1);
seg1.setStatus(SegmentStatusEnum.READY);
- CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2);
seg2.setStatus(SegmentStatusEnum.READY);
- CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+ CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3);
seg3.setStatus(SegmentStatusEnum.READY);
- CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+ CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4);
seg4.setStatus(SegmentStatusEnum.READY);
@@ -246,10 +267,10 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
assertEquals(0, cube.getSegments().size());
// append first
- CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
seg1.setStatus(SegmentStatusEnum.READY);
- CubeSegment seg3 = mgr.appendSegment(cube, 2000, 4000, 0, 0);
+ CubeSegment seg3 = mgr.appendSegment(cube, 2000, 4000);
seg3.setStatus(SegmentStatusEnum.READY);
assertEquals(2, cube.getSegments().size());
@@ -260,7 +281,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
// append a new seg which will be merged
- CubeSegment seg4 = mgr.appendSegment(cube, 4000, 8000, 0, 0);
+ CubeSegment seg4 = mgr.appendSegment(cube, 4000, 8000);
seg4.setStatus(SegmentStatusEnum.READY);
assertEquals(3, cube.getSegments().size());
@@ -272,7 +293,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
// fill the gap
- CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0);
+ CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
seg2.setStatus(SegmentStatusEnum.READY);
assertEquals(4, cube.getSegments().size());
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index f70aef9..828a3a9 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -106,11 +106,11 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
assertEquals(0, cube.getSegments().size());
// append first
- CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
seg1.setStatus(SegmentStatusEnum.READY);
// append second
- CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0);
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000);
assertEquals(2, cube.getSegments().size());
assertEquals(1000, seg2.getDateRangeStart());
@@ -164,25 +164,25 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
assertEquals(0, cube.getSegments().size());
// append the first
- CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0);
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
seg1.setStatus(SegmentStatusEnum.READY);
assertEquals(1, cube.getSegments().size());
// append the third
- CubeSegment seg3 = mgr.appendSegment(cube, 2000, 3000, 0, 0);
+ CubeSegment seg3 = mgr.appendSegment(cube, 2000, 3000);
seg3.setStatus(SegmentStatusEnum.READY);
assertEquals(2, cube.getSegments().size());
// reject overlap
try {
- mgr.appendSegment(cube, 1000, 2500, 0, 0);
+ mgr.appendSegment(cube, 1000, 2500);
fail();
} catch (IllegalStateException ex) {
// good
}
// append the second
- CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0);
+ CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
seg2.setStatus(SegmentStatusEnum.READY);
assertEquals(3, cube.getSegments().size());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 07a4cfb..350a5f8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -104,7 +104,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
try {
- CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0);
+ CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond());
segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
segment.setInputRecords(streamingBatch.getMessages().size());
segment.setLastBuildTime(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 971b293..f6c8801 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -404,7 +404,7 @@ public class BuildCubeWithEngine {
}
private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, endDate, 0, 0);
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, endDate);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index f8805a6..c30abc0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.util.ToolRunner;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
@@ -205,7 +204,7 @@ public class BuildCubeWithStream {
for (int i = 0; i < futures.size(); i++) {
ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES);
logger.info("Checking building task " + i + " whose state is " + result);
- Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED );
+ Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED);
if (result == ExecutableState.SUCCEED)
succeedBuild++;
}
@@ -214,7 +213,6 @@ public class BuildCubeWithStream {
List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY);
Assert.assertTrue(segments.size() == succeedBuild);
-
if (fastBuildMode == false) {
//empty build
ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
@@ -238,7 +236,6 @@ public class BuildCubeWithStream {
logger.info("Build is done");
}
-
private ExecutableState mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception {
CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false);
DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
@@ -256,7 +253,7 @@ public class BuildCubeWithStream {
}
protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
- CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset);
+ CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, null, null);
DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
jobService.addJob(job);
waitForJob(job.getId());
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 42b117c..be242c3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
@@ -36,7 +37,6 @@ import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dimension.DimensionEncodingFactory;
import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -54,9 +54,6 @@ import org.apache.kylin.rest.response.GeneralResponse;
import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.service.CubeService;
import org.apache.kylin.rest.service.JobService;
-import org.apache.kylin.rest.service.KafkaConfigService;
-import org.apache.kylin.rest.service.StreamingService;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,12 +83,6 @@ public class CubeController extends BasicController {
private static final Logger logger = LoggerFactory.getLogger(CubeController.class);
@Autowired
- private StreamingService streamingService;
-
- @Autowired
- private KafkaConfigService kafkaConfigService;
-
- @Autowired
private CubeService cubeService;
@Autowired
@@ -272,7 +263,7 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
- return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
+ return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, null, null, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment());
}
/** Build/Rebuild a cube segment by source offset */
@@ -286,16 +277,16 @@ public class CubeController extends BasicController {
@RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) {
- return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce());
+ return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), req.getSourceOffsetEnd(), req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(), req.isForce());
}
private JobInstance buildInternal(String cubeName, long startTime, long endTime, //
- long startOffset, long endOffset, String buildType, boolean force) {
+ long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, //
- CubeBuildTypeEnum.valueOf(buildType), force, submitter);
+ sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, submitter);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage());
@@ -542,6 +533,73 @@ public class CubeController extends BasicController {
return hbase;
}
+ /**
+ * get cube segment holes
+ *
+ * @return true
+ * @throws IOException
+ */
+ @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.GET })
+ @ResponseBody
+ public List<CubeSegment> getHoles(@PathVariable String cubeName) {
+ return cubeService.getCubeManager().calculateHoles(cubeName);
+ }
+
+ /**
+ * get cube segment holes
+ *
+ * @return true
+ * @throws IOException
+ */
+ @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.PUT })
+ @ResponseBody
+ public List<JobInstance> fillHoles(@PathVariable String cubeName) {
+ List<JobInstance> jobs = Lists.newArrayList();
+ List<CubeSegment> holes = cubeService.getCubeManager().calculateHoles(cubeName);
+
+ if (holes.size() == 0) {
+ logger.info("No hole detected for cube '" + cubeName + "'");
+ return jobs;
+ }
+
+ boolean isOffsetOn = holes.get(0).isSourceOffsetsOn();
+ for (CubeSegment hole : holes) {
+ if (isOffsetOn == true) {
+ JobBuildRequest2 request = new JobBuildRequest2();
+ request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
+ request.setSourceOffsetStart(hole.getSourceOffsetStart());
+ request.setSourcePartitionOffsetStart(hole.getSourcePartitionOffsetStart());
+ request.setSourceOffsetEnd(hole.getSourceOffsetEnd());
+ request.setSourcePartitionOffsetEnd(hole.getSourcePartitionOffsetEnd());
+ try {
+ JobInstance job = build(cubeName, request);
+ jobs.add(job);
+ } catch (Exception e) {
+ // it may exceed the max allowed job number
+ logger.info("Error to submit job for hole '" + hole.toString() + "', skip it now.", e);
+ continue;
+ }
+ } else {
+ JobBuildRequest request = new JobBuildRequest();
+ request.setBuildType(CubeBuildTypeEnum.BUILD.toString());
+ request.setStartTime(hole.getDateRangeStart());
+ request.setEndTime(hole.getDateRangeEnd());
+
+ try {
+ JobInstance job = build(cubeName, request);
+ jobs.add(job);
+ } catch (Exception e) {
+ // it may exceed the max allowed job number
+ logger.info("Error to submit job for hole '" + hole.toString() + "', skip it now.", e);
+ continue;
+ }
+ }
+ }
+
+ return jobs;
+
+ }
+
private CubeDesc deserializeCubeDesc(CubeRequest cubeRequest) {
CubeDesc desc = null;
try {
@@ -560,42 +618,6 @@ public class CubeController extends BasicController {
return desc;
}
- private StreamingConfig deserializeStreamingDesc(CubeRequest cubeRequest) {
- StreamingConfig desc = null;
- try {
- logger.debug("Saving StreamingConfig " + cubeRequest.getStreamingData());
- desc = JsonUtil.readValue(cubeRequest.getStreamingData(), StreamingConfig.class);
- } catch (JsonParseException e) {
- logger.error("The StreamingConfig definition is not valid.", e);
- updateRequest(cubeRequest, false, e.getMessage());
- } catch (JsonMappingException e) {
- logger.error("The data StreamingConfig definition is not valid.", e);
- updateRequest(cubeRequest, false, e.getMessage());
- } catch (IOException e) {
- logger.error("Failed to deal with the request.", e);
- throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
- }
- return desc;
- }
-
- private KafkaConfig deserializeKafkaDesc(CubeRequest cubeRequest) {
- KafkaConfig desc = null;
- try {
- logger.debug("Saving KafkaConfig " + cubeRequest.getKafkaData());
- desc = JsonUtil.readValue(cubeRequest.getKafkaData(), KafkaConfig.class);
- } catch (JsonParseException e) {
- logger.error("The KafkaConfig definition is not valid.", e);
- updateRequest(cubeRequest, false, e.getMessage());
- } catch (JsonMappingException e) {
- logger.error("The data KafkaConfig definition is not valid.", e);
- updateRequest(cubeRequest, false, e.getMessage());
- } catch (IOException e) {
- logger.error("Failed to deal with the request.", e);
- throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
- }
- return desc;
- }
-
private void updateRequest(CubeRequest request, boolean success, String message) {
request.setCubeDescData("");
request.setSuccessful(success);
@@ -610,12 +632,4 @@ public class CubeController extends BasicController {
this.jobService = jobService;
}
- public void setStreamingService(StreamingService streamingService) {
- this.streamingService = streamingService;
- }
-
- public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
- this.kafkaConfigService = kafkaConfigService;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
index dc3b433..6e9117d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java
@@ -18,30 +18,54 @@
package org.apache.kylin.rest.request;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
public class JobBuildRequest2 {
- private long startSourceOffset;
+ private long sourceOffsetStart;
+
+ private long sourceOffsetEnd;
- private long endSourceOffset;
+ private Map<Integer, Long> sourcePartitionOffsetStart = Maps.newHashMap();
+
+ private Map<Integer, Long> sourcePartitionOffsetEnd = Maps.newHashMap();
private String buildType;
private boolean force;
- public long getStartSourceOffset() {
- return startSourceOffset;
+ public long getSourceOffsetStart() {
+ return sourceOffsetStart;
+ }
+
+ public void setSourceOffsetStart(long sourceOffsetStart) {
+ this.sourceOffsetStart = sourceOffsetStart;
+ }
+
+ public long getSourceOffsetEnd() {
+ return sourceOffsetEnd;
+ }
+
+ public void setSourceOffsetEnd(long sourceOffsetEnd) {
+ this.sourceOffsetEnd = sourceOffsetEnd;
+ }
+
+ public Map<Integer, Long> getSourcePartitionOffsetStart() {
+ return sourcePartitionOffsetStart;
}
- public void setStartSourceOffset(long startSourceOffset) {
- this.startSourceOffset = startSourceOffset;
+ public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
+ this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
}
- public long getEndSourceOffset() {
- return endSourceOffset;
+ public Map<Integer, Long> getSourcePartitionOffsetEnd() {
+ return sourcePartitionOffsetEnd;
}
- public void setEndSourceOffset(long endSourceOffset) {
- this.endSourceOffset = endSourceOffset;
+ public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) {
+ this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
}
public String getBuildType() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 5c704ba..589f67d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -199,7 +199,7 @@ public class JobService extends BasicService {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, //
- CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
+ Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
throw new BadRequestException("Broken cube " + cube.getName() + " can't be built");
@@ -211,7 +211,7 @@ public class JobService extends BasicService {
DefaultChainedExecutable job;
if (buildType == CubeBuildTypeEnum.BUILD) {
- CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset);
+ CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
index 6319899..d192a7d 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
@@ -23,7 +23,9 @@ import java.io.StringWriter;
import java.util.List;
import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.request.CubeRequest;
import org.apache.kylin.rest.service.CubeService;
@@ -60,7 +62,6 @@ public class CubeControllerTest extends ServiceTestBase {
cubeController = new CubeController();
cubeController.setCubeService(cubeService);
cubeController.setJobService(jobService);
- cubeController.setStreamingService(streamingService);
cubeDescController = new CubeDescController();
cubeDescController.setCubeService(cubeService);
@@ -163,6 +164,32 @@ public class CubeControllerTest extends ServiceTestBase {
Assert.assertTrue(segNumber == newSegNumber + 1);
}
+
+ @Test
+ public void testGetHoles() throws IOException {
+ String cubeName = "test_kylin_cube_with_slr_ready_3_segments";
+ CubeDesc[] cubes = cubeDescController.getCube(cubeName);
+ Assert.assertNotNull(cubes);
+
+ CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+ List<CubeSegment> segments = cube.getSegments();
+
+ final long dateEnd = segments.get(segments.size() -1).getDateRangeEnd();
+
+ final long ONEDAY = 24 * 60 * 60000;
+ cubeService.getCubeManager().appendSegment(cube, dateEnd + ONEDAY, dateEnd + ONEDAY * 2);
+
+ List<CubeSegment> holes = cubeController.getHoles(cubeName);
+
+ Assert.assertTrue(holes.size() == 1);
+
+ CubeSegment hole = holes.get(0);
+
+ Assert.assertTrue(hole.getDateRangeStart() == dateEnd && hole.getDateRangeEnd() == (dateEnd + ONEDAY));
+
+ }
+
+
@Test
public void testGetCubes() {
List<CubeInstance> cubes = cubeController.getCubes(null, null, null, 1, 0);
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index af9ccc0..a3ce5c5 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -234,7 +234,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
CubeInstance cube = cubeManager.getCube(cubeName);
assertEquals(0, cube.getSegments().size());
assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
- CubeSegment segment = cubeManager.appendSegment(cube, 0, 1000, 0, 0);
+ CubeSegment segment = cubeManager.appendSegment(cube, 0, 1000);
//one for cube update
assertEquals(1, broadcaster.getCounterAndClear());
waitForCounterAndClear(1);
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
index a21b980..18c959a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java
@@ -18,10 +18,9 @@
package org.apache.kylin.source.kafka;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import com.google.common.collect.Maps;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -34,8 +33,6 @@ import org.apache.kylin.job.execution.ExecuteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
-
/**
*/
public class MergeOffsetStep extends AbstractExecutable {
@@ -52,26 +49,20 @@ public class MergeOffsetStep extends AbstractExecutable {
final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
List<CubeSegment> mergingSegs = cube.getMergingSegments(segment);
- Map<Integer, Long> mergedStartOffsets = Maps.newHashMap();
- Map<Integer, Long> mergedEndOffsets = Maps.newHashMap();
- long dateRangeStart = Long.MAX_VALUE, dateRangeEnd = 0;
- for (CubeSegment seg: mergingSegs) {
- Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(seg);
- Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(seg);
+ Collections.sort(mergingSegs);
- for (Integer partition : startOffsets.keySet()) {
- long currentStart = mergedStartOffsets.get(partition) != null ? Long.valueOf(mergedStartOffsets.get(partition)) : Long.MAX_VALUE;
- long currentEnd = mergedEndOffsets.get(partition) != null ? Long.valueOf(mergedEndOffsets.get(partition)) : 0;
- mergedStartOffsets.put(partition, Math.min(currentStart, startOffsets.get(partition)));
- mergedEndOffsets.put(partition, Math.max(currentEnd, endOffsets.get(partition)));
- }
- dateRangeStart = Math.min(dateRangeStart, seg.getDateRangeStart());
- dateRangeEnd = Math.max(dateRangeEnd, seg.getDateRangeEnd());
- }
+ final CubeSegment first = mergingSegs.get(0);
+ final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
+
+ segment.setSourceOffsetStart(first.getSourceOffsetStart());
+ segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+ segment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+ segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+
+ long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs);
+ long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs);
- KafkaOffsetMapping.saveOffsetStart(segment, mergedStartOffsets);
- KafkaOffsetMapping.saveOffsetEnd(segment, mergedEndOffsets);
segment.setDateRangeStart(dateRangeStart);
segment.setDateRangeEnd(dateRangeEnd);
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
index e1282d6..151b912 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java
@@ -18,7 +18,6 @@
package org.apache.kylin.source.kafka;
import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kylin.cube.CubeInstance;
@@ -54,8 +53,8 @@ public class SeekOffsetStep extends AbstractExecutable {
final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
- Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(segment);
- Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(segment);
+ Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart();
+ Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd();
if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided.");
@@ -70,7 +69,7 @@ public class SeekOffsetStep extends AbstractExecutable {
if (startOffsets.isEmpty()) {
// user didn't specify start offset, use the biggest offset in existing segments as start
for (CubeSegment seg : cube.getSegments()) {
- Map<Integer, Long> segEndOffset = KafkaOffsetMapping.parseOffsetEnd(seg);
+ Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd();
for (PartitionInfo partition : partitionInfos) {
int partitionId = partition.partition();
if (segEndOffset.containsKey(partitionId)) {
@@ -110,8 +109,10 @@ public class SeekOffsetStep extends AbstractExecutable {
}
if (totalEndOffset > totalStartOffset) {
- KafkaOffsetMapping.saveOffsetStart(segment, startOffsets);
- KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets);
+ segment.setSourceOffsetStart(totalStartOffset);
+ segment.setSourceOffsetEnd(totalEndOffset);
+ segment.setSourcePartitionOffsetStart(startOffsets);
+ segment.setSourcePartitionOffsetEnd(endOffsets);
segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset));
CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToUpdateSegs(segment);
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 87d2471..e20b20a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -19,7 +19,6 @@
package org.apache.kylin.source.kafka.hadoop;
import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.source.kafka.util.KafkaOffsetMapping;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -132,8 +131,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
private void setupMapper(CubeSegment cubeSeg) throws IOException {
// set the segment's offset info to job conf
- Map<Integer, Long> offsetStart = KafkaOffsetMapping.parseOffsetStart(cubeSeg);
- Map<Integer, Long> offsetEnd = KafkaOffsetMapping.parseOffsetEnd(cubeSeg);
+ Map<Integer, Long> offsetStart = cubeSeg.getSourcePartitionOffsetStart();
+ Map<Integer, Long> offsetEnd = cubeSeg.getSourcePartitionOffsetEnd();
Integer minPartition = Collections.min(offsetStart.keySet());
Integer maxPartition = Collections.max(offsetStart.keySet());
http://git-wip-us.apache.org/repos/asf/kylin/blob/72425d4e/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
deleted file mode 100644
index b46e57f..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.kafka.util;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.cube.CubeSegment;
-
-import java.util.Map;
-
-/**
- */
-public class KafkaOffsetMapping {
-
- public static final String OFFSET_START = "kafka.offset.start.";
- public static final String OFFSET_END = "kafka.offset.end.";
-
- /**
- * Get the start offsets for each partition from a segment
- *
- * @param segment
- * @return
- */
- public static Map<Integer, Long> parseOffsetStart(CubeSegment segment) {
- return parseOffset(segment, OFFSET_START);
- }
-
- /**
- * Get the end offsets for each partition from a segment
- *
- * @param segment
- * @return
- */
- public static Map<Integer, Long> parseOffsetEnd(CubeSegment segment) {
- return parseOffset(segment, OFFSET_END);
- }
-
- /**
- * Save the partition start offset to cube segment
- *
- * @param segment
- * @param offsetStart
- */
- public static void saveOffsetStart(CubeSegment segment, Map<Integer, Long> offsetStart) {
- long sourceOffsetStart = 0;
- for (Integer partition : offsetStart.keySet()) {
- segment.getAdditionalInfo().put(OFFSET_START + partition, String.valueOf(offsetStart.get(partition)));
- sourceOffsetStart += offsetStart.get(partition);
- }
-
- segment.setSourceOffsetStart(sourceOffsetStart);
- }
-
- /**
- * Save the partition end offset to cube segment
- *
- * @param segment
- * @param offsetEnd
- */
- public static void saveOffsetEnd(CubeSegment segment, Map<Integer, Long> offsetEnd) {
- long sourceOffsetEnd = 0;
- for (Integer partition : offsetEnd.keySet()) {
- segment.getAdditionalInfo().put(OFFSET_END + partition, String.valueOf(offsetEnd.get(partition)));
- sourceOffsetEnd += offsetEnd.get(partition);
- }
-
- segment.setSourceOffsetEnd(sourceOffsetEnd);
- }
-
- private static Map<Integer, Long> parseOffset(CubeSegment segment, String propertyPrefix) {
- final Map<Integer, Long> offsetStartMap = Maps.newHashMap();
- for (String key : segment.getAdditionalInfo().keySet()) {
- if (key.startsWith(propertyPrefix)) {
- Integer partition = Integer.valueOf(key.substring(propertyPrefix.length()));
- Long offset = Long.valueOf(segment.getAdditionalInfo().get(key));
- offsetStartMap.put(partition, offset);
- }
- }
-
-
- return offsetStartMap;
- }
-}