You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:08:12 UTC

[16/26] incubator-kylin git commit: fix

fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c038cb4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c038cb4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c038cb4b

Branch: refs/heads/streaming
Commit: c038cb4bada510a17c9bb36a32ddbdb7ff947ac9
Parents: 9c3a606
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 11:41:56 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 11:41:56 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/streaming/kafka/Consumer.java  |  3 +++
 .../apache/kylin/streaming/kafka/Requester.java | 27 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index eaee2a1..c825d4b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -34,6 +34,7 @@
 
 package org.apache.kylin.streaming.kafka;
 
+import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
@@ -93,6 +94,8 @@ public class Consumer implements Runnable {
                 logger.warn("cannot find lead broker");
                 continue;
             }
+            final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+            offset.set(lastOffset);
             final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
             if (fetchResponse.errorCode(topic, partitionId) != 0) {
                 logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index 1f14a47..f4cdd8e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -37,7 +37,9 @@ package org.apache.kylin.streaming.kafka;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
 import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
 import kafka.javaapi.*;
 import kafka.javaapi.consumer.SimpleConsumer;
 import org.slf4j.Logger;
@@ -45,7 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Created by qianzhou on 2/15/15.
@@ -114,8 +118,29 @@ public final class Requester {
         SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
         kafka.api.FetchRequest req = new FetchRequestBuilder()
                 .clientId(clientName)
-                .addFetch(topic, partitionId, offset, consumerConfig.getTimeout()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+                .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                 .build();
         return consumer.fetch(req);
     }
+
+    public static long getLastOffset(String topic, int partitionId,
+                                     long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+        String clientName = "client_" + topic + "_" + partitionId;
+        SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+
+        if (response.hasError()) {
+            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+            return 0;
+        }
+        long[] offsets = response.offsets(topic, partitionId);
+        return offsets[0];
+    }
+
+
 }