You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/09 02:26:03 UTC

kylin git commit: more logs

Repository: kylin
Updated Branches:
  refs/heads/v1.6.0-rc1 0425228df -> 39b7dad41


more logs


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/39b7dad4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/39b7dad4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/39b7dad4

Branch: refs/heads/v1.6.0-rc1
Commit: 39b7dad41e2764def1385f3f4b5303ca7f940b10
Parents: 0425228
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 9 10:25:58 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 9 10:25:58 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/SourcePartition.java    | 12 ++++++--
 .../apache/kylin/source/kafka/KafkaSource.java  | 29 ++++++++++----------
 2 files changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/39b7dad4/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
index 8ba749d..161977a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
@@ -18,8 +18,11 @@
 
 package org.apache.kylin.source;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.base.Objects;
+
 /**
  */
 public class SourcePartition {
@@ -90,14 +93,19 @@ public class SourcePartition {
         this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
     }
 
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("startDate", startDate).add("endDate", endDate).add("startOffset", startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", sourcePartitionOffsetEnd.toString()).toString();
+    }
+
     public static SourcePartition getCopyOf(SourcePartition origin) {
         SourcePartition copy = new SourcePartition();
         copy.setStartDate(origin.getStartDate());
         copy.setEndDate(origin.getEndDate());
         copy.setStartOffset(origin.getStartOffset());
         copy.setEndOffset(origin.getEndOffset());
-        copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart());
-        copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd());
+        copy.setSourcePartitionOffsetStart(new HashMap<>(origin.getSourcePartitionOffsetStart()));
+        copy.setSourcePartitionOffsetEnd(new HashMap<>origin.getSourcePartitionOffsetEnd());
         return copy;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/39b7dad4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index b0434f0..a919043 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -35,11 +35,11 @@ import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.util.KafkaClient;
-
-import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 //used by reflection
 public class KafkaSource implements ISource {
 
@@ -94,23 +94,22 @@ public class KafkaSource implements ISource {
         final String topic = kafakaConfig.getTopic();
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-            if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) {
-                // has new partition added
-                logger.debug("has new partition added");
-                for (PartitionInfo partitionInfo : partitionInfos) {
-                    if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) {
-                        long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition());
-                        logger.debug("new partition " + partitionInfo.partition() + " starts from " + earliest);
-                        result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest);
-                    }
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) {
+                    // has new partition added
+                    logger.debug("has new partition added");
+                    long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition());
+                    logger.debug("new partition " + partitionInfo.partition() + " starts from " + earliest);
+                    result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest);
                 }
-            } else {
-                logger.debug("no new partition");
             }
         }
 
         if (result.getEndOffset() == Long.MAX_VALUE) {
-            result.setSourcePartitionOffsetEnd(KafkaClient.getCurrentOffsets(cube));
+            logger.debug("Seek end offsets from topic");
+            Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube);
+            logger.debug("The end offsets are " + latestOffsets);
+            result.setSourcePartitionOffsetEnd(latestOffsets);
         }
 
         long totalStartOffset = 0, totalEndOffset = 0;
@@ -132,7 +131,7 @@ public class KafkaSource implements ISource {
         result.setStartOffset(totalStartOffset);
         result.setEndOffset(totalEndOffset);
 
-        logger.debug("KafkaSource.parsePartitionBeforeBuild, return " + result);
+        logger.debug("parsePartitionBeforeBuild() return: " + result);
         return result;
     }