You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/02/23 11:42:20 UTC

kylin git commit: KYLIN-1435: Relax the checking for PartitionMetadata and logger the error code

Repository: kylin
Updated Branches:
  refs/heads/devstreaming bdada03f7 -> 387f7de6c


KYLIN-1435: Relax the checking for PartitionMetadata and logger the error code

Signed-off-by: honma <ho...@ebay.com>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/387f7de6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/387f7de6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/387f7de6

Branch: refs/heads/devstreaming
Commit: 387f7de6c9e75764c4886137dc0002c462a4170e
Parents: bdada03
Author: yangzhong <ya...@ebay.com>
Authored: Tue Feb 23 18:31:37 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Feb 23 18:41:52 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 5 ++++-
 .../java/org/apache/kylin/source/kafka/util/KafkaUtils.java     | 5 ++++-
 .../src/main/java/org/apache/kylin/streaming/KafkaConsumer.java | 5 ++++-
 .../src/main/java/org/apache/kylin/streaming/StreamingUtil.java | 5 ++++-
 4 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 393b8e7..581fe8f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -133,7 +133,10 @@ public class KafkaStreamingInput implements IStreamingInput {
 
         private Broker getLeadBroker() {
             final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig);
-            if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+            if (partitionMetadata != null) {
+                if (partitionMetadata.errorCode() != 0){
+                    logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+                }
                 replicaBrokers = partitionMetadata.replicas();
                 return partitionMetadata.leader();
             } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 2833ea4..fc3b7f6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -33,7 +33,10 @@ public final class KafkaUtils {
 
     public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
         final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
-        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+        if (partitionMetadata != null) {
+            if (partitionMetadata.errorCode() != 0){
+                logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+            }
             return partitionMetadata.leader();
         } else {
             return null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 6d693c3..869e555 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -92,7 +92,10 @@ public class KafkaConsumer implements Runnable {
 
     private Broker getLeadBroker() {
         final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(topic, partitionId, replicaBrokers, streamingConfig);
-        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+        if (partitionMetadata != null) {
+            if (partitionMetadata.errorCode() != 0){
+                logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+            }
             replicaBrokers = partitionMetadata.replicas();
             return partitionMetadata.leader();
         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/387f7de6/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 1d6b95c..4cf837e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -30,7 +30,10 @@ public final class StreamingUtil {
 
     public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
         final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
-        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+        if (partitionMetadata != null) {
+            if (partitionMetadata.errorCode() != 0){
+                logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+            }
             return partitionMetadata.leader();
         } else {
             return null;