You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/09 02:26:03 UTC
kylin git commit: more logs
Repository: kylin
Updated Branches:
refs/heads/v1.6.0-rc1 0425228df -> 39b7dad41
more logs
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/39b7dad4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/39b7dad4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/39b7dad4
Branch: refs/heads/v1.6.0-rc1
Commit: 39b7dad41e2764def1385f3f4b5303ca7f940b10
Parents: 0425228
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 9 10:25:58 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 9 10:25:58 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/SourcePartition.java | 12 ++++++--
.../apache/kylin/source/kafka/KafkaSource.java | 29 ++++++++++----------
2 files changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/39b7dad4/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
index 8ba749d..161977a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
@@ -18,8 +18,11 @@
package org.apache.kylin.source;
+import java.util.HashMap;
import java.util.Map;
+import com.google.common.base.Objects;
+
/**
*/
public class SourcePartition {
@@ -90,14 +93,19 @@ public class SourcePartition {
this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
}
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("startDate", startDate).add("endDate", endDate).add("startOffset", startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", sourcePartitionOffsetEnd.toString()).toString();
+ }
+
public static SourcePartition getCopyOf(SourcePartition origin) {
SourcePartition copy = new SourcePartition();
copy.setStartDate(origin.getStartDate());
copy.setEndDate(origin.getEndDate());
copy.setStartOffset(origin.getStartOffset());
copy.setEndOffset(origin.getEndOffset());
- copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart());
- copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd());
+ copy.setSourcePartitionOffsetStart(new HashMap<>(origin.getSourcePartitionOffsetStart()));
+ copy.setSourcePartitionOffsetEnd(new HashMap<>origin.getSourcePartitionOffsetEnd());
return copy;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/39b7dad4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index b0434f0..a919043 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -35,11 +35,11 @@ import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.util.KafkaClient;
-
-import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
//used by reflection
public class KafkaSource implements ISource {
@@ -94,23 +94,22 @@ public class KafkaSource implements ISource {
final String topic = kafakaConfig.getTopic();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
- if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) {
- // has new partition added
- logger.debug("has new partition added");
- for (PartitionInfo partitionInfo : partitionInfos) {
- if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) {
- long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition());
- logger.debug("new partition " + partitionInfo.partition() + " starts from " + earliest);
- result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest);
- }
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) {
+ // has new partition added
+ logger.debug("has new partition added");
+ long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition());
+ logger.debug("new partition " + partitionInfo.partition() + " starts from " + earliest);
+ result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest);
}
- } else {
- logger.debug("no new partition");
}
}
if (result.getEndOffset() == Long.MAX_VALUE) {
- result.setSourcePartitionOffsetEnd(KafkaClient.getCurrentOffsets(cube));
+ logger.debug("Seek end offsets from topic");
+ Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube);
+ logger.debug("The end offsets are " + latestOffsets);
+ result.setSourcePartitionOffsetEnd(latestOffsets);
}
long totalStartOffset = 0, totalEndOffset = 0;
@@ -132,7 +131,7 @@ public class KafkaSource implements ISource {
result.setStartOffset(totalStartOffset);
result.setEndOffset(totalEndOffset);
- logger.debug("KafkaSource.parsePartitionBeforeBuild, return " + result);
+ logger.debug("parsePartitionBeforeBuild() return: " + result);
return result;
}