You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/09 03:14:11 UTC

kylin git commit: Add more checks

Repository: kylin
Updated Branches:
  refs/heads/v1.6.0-rc1 1ffbe19aa -> 710232e14


Add more checks


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/710232e1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/710232e1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/710232e1

Branch: refs/heads/v1.6.0-rc1
Commit: 710232e14d461334a42f1a957c291668d661a135
Parents: 1ffbe19
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 9 11:14:05 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 9 11:14:05 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/source/kafka/KafkaSource.java   | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/710232e1/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index a919043..b0c8e7f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -109,6 +109,16 @@ public class KafkaSource implements ISource {
             logger.debug("Seek end offsets from topic");
             Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube);
             logger.debug("The end offsets are " + latestOffsets);
+
+            for (Integer partitionId : latestOffsets.keySet()) {
+                if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
+                    if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) {
+                        throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")");
+                    }
+                } else {
+                    throw new IllegalStateException("New partition added in between, retry.");
+                }
+            }
             result.setSourcePartitionOffsetEnd(latestOffsets);
         }