You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/02/23 11:42:20 UTC
kylin git commit: KYLIN-1435: Relax the checking for
PartitionMetadata and logger the error code
Repository: kylin
Updated Branches:
refs/heads/devstreaming bdada03f7 -> 387f7de6c
KYLIN-1435: Relax the checking for PartitionMetadata and logger the error code
Signed-off-by: honma <ho...@ebay.com>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/387f7de6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/387f7de6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/387f7de6
Branch: refs/heads/devstreaming
Commit: 387f7de6c9e75764c4886137dc0002c462a4170e
Parents: bdada03
Author: yangzhong <ya...@ebay.com>
Authored: Tue Feb 23 18:31:37 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Feb 23 18:41:52 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 5 ++++-
.../java/org/apache/kylin/source/kafka/util/KafkaUtils.java | 5 ++++-
.../src/main/java/org/apache/kylin/streaming/KafkaConsumer.java | 5 ++++-
.../src/main/java/org/apache/kylin/streaming/StreamingUtil.java | 5 ++++-
4 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 393b8e7..581fe8f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -133,7 +133,10 @@ public class KafkaStreamingInput implements IStreamingInput {
private Broker getLeadBroker() {
final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ if (partitionMetadata != null) {
+ if (partitionMetadata.errorCode() != 0){
+ logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+ }
replicaBrokers = partitionMetadata.replicas();
return partitionMetadata.leader();
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 2833ea4..fc3b7f6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -33,7 +33,10 @@ public final class KafkaUtils {
public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ if (partitionMetadata != null) {
+ if (partitionMetadata.errorCode() != 0){
+ logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+ }
return partitionMetadata.leader();
} else {
return null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 6d693c3..869e555 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -92,7 +92,10 @@ public class KafkaConsumer implements Runnable {
private Broker getLeadBroker() {
final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(topic, partitionId, replicaBrokers, streamingConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ if (partitionMetadata != null) {
+ if (partitionMetadata.errorCode() != 0){
+ logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+ }
replicaBrokers = partitionMetadata.replicas();
return partitionMetadata.leader();
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 1d6b95c..4cf837e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -30,7 +30,10 @@ public final class StreamingUtil {
public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ if (partitionMetadata != null) {
+ if (partitionMetadata.errorCode() != 0){
+ logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+ }
return partitionMetadata.leader();
} else {
return null;