You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/16 03:54:04 UTC

kylin git commit: KYLIN-2131, fix class getName()

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2131 cedcca6e4 -> 08b76cb00


KYLIN-2131, fix class getName()


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/08b76cb0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/08b76cb0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/08b76cb0

Branch: refs/heads/KYLIN-2131
Commit: 08b76cb00a2ca2731c32ea7639ebf4dae39bbf52
Parents: cedcca6
Author: Billy Liu <bi...@apache.org>
Authored: Fri Dec 16 11:53:43 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Fri Dec 16 11:53:43 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/streaming/Kafka10DataLoader.java  | 5 +++--
 .../main/java/org/apache/kylin/source/kafka/KafkaSource.java    | 4 ++--
 .../java/org/apache/kylin/source/kafka/util/KafkaClient.java    | 4 ++--
 3 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/08b76cb0/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index c7a487a..fae81ce 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -79,8 +80,8 @@ public class Kafka10DataLoader extends StreamDataLoader {
         Properties props = new Properties();
         props.put("retry.backoff.ms", "1000");
         props.put("bootstrap.servers", brokers);
-        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
         props.put("acks", "1");
         props.put("buffer.memory", 33554432);
         props.put("retries", 0);

http://git-wip-us.apache.org/repos/asf/kylin/blob/08b76cb0/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 1f3c446..6c1ac1f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -85,8 +85,8 @@ public class KafkaSource implements ISource {
                 logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart());
                 result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
             } else {
-                // from the topic's very begining;
-                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's very beginning.");
+                // from the topic's earliest offset;
+                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
                 result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/08b76cb0/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index f891467..51339c7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -56,8 +56,8 @@ public class KafkaClient {
             }
         }
         props.put("bootstrap.servers", brokers);
-        props.put("key.deserializer", StringDeserializer.class.getClass().getCanonicalName());
-        props.put("value.deserializer", StringDeserializer.class.getClass().getCanonicalName());
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
         props.put("group.id", consumerGroup);
         props.put("enable.auto.commit", "false");
         return props;