You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:08:12 UTC
[16/26] incubator-kylin git commit: fix
fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c038cb4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c038cb4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c038cb4b
Branch: refs/heads/streaming
Commit: c038cb4bada510a17c9bb36a32ddbdb7ff947ac9
Parents: 9c3a606
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Mar 2 11:41:56 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Mar 2 11:41:56 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/streaming/kafka/Consumer.java | 3 +++
.../apache/kylin/streaming/kafka/Requester.java | 27 +++++++++++++++++++-
2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
index eaee2a1..c825d4b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Consumer.java
@@ -34,6 +34,7 @@
package org.apache.kylin.streaming.kafka;
+import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
@@ -93,6 +94,8 @@ public class Consumer implements Runnable {
logger.warn("cannot find lead broker");
continue;
}
+ final long lastOffset = Requester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, consumerConfig);
+ offset.set(lastOffset);
final FetchResponse fetchResponse = Requester.fetchResponse(topic, partitionId, offset.get(), leadBroker, consumerConfig);
if (fetchResponse.errorCode(topic, partitionId) != 0) {
logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c038cb4b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
index 1f14a47..f4cdd8e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/Requester.java
@@ -37,7 +37,9 @@ package org.apache.kylin.streaming.kafka;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
+import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
@@ -45,7 +47,9 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Created by qianzhou on 2/15/15.
@@ -114,8 +118,29 @@ public final class Requester {
SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
kafka.api.FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
- .addFetch(topic, partitionId, offset, consumerConfig.getTimeout()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
+ .addFetch(topic, partitionId, offset, consumerConfig.getMaxReadCount()) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
return consumer.fetch(req);
}
+
+ public static long getLastOffset(String topic, int partitionId,
+ long whichTime, Broker broker, ConsumerConfig consumerConfig) {
+ String clientName = "client_" + topic + "_" + partitionId;
+ SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), consumerConfig.getTimeout(), consumerConfig.getBufferSize(), clientName);
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+ return 0;
+ }
+ long[] offsets = response.offsets(topic, partitionId);
+ return offsets[0];
+ }
+
+
}