You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/09 03:14:11 UTC
kylin git commit: Add more checks
Repository: kylin
Updated Branches:
refs/heads/v1.6.0-rc1 1ffbe19aa -> 710232e14
Add more checks
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/710232e1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/710232e1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/710232e1
Branch: refs/heads/v1.6.0-rc1
Commit: 710232e14d461334a42f1a957c291668d661a135
Parents: 1ffbe19
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 9 11:14:05 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 9 11:14:05 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/source/kafka/KafkaSource.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/710232e1/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index a919043..b0c8e7f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -109,6 +109,16 @@ public class KafkaSource implements ISource {
logger.debug("Seek end offsets from topic");
Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube);
logger.debug("The end offsets are " + latestOffsets);
+
+ for (Integer partitionId : latestOffsets.keySet()) {
+ if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
+ if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) {
+ throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
+ }
+ } else {
+ throw new IllegalStateException("New partition added in between, retry.");
+ }
+ }
result.setSourcePartitionOffsetEnd(latestOffsets);
}