You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/09/08 16:51:28 UTC

crunch git commit: CRUNCH-617: Support defensively handling null when partition leader cannot be found.

Repository: crunch
Updated Branches:
  refs/heads/master 5d237b366 -> e8d2a69b6


CRUNCH-617: Support defensively handling null when partition leader cannot be found.

Signed-off-by: Micah Whitacre <mk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e8d2a69b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e8d2a69b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e8d2a69b

Branch: refs/heads/master
Commit: e8d2a69b6df297f02dfe45053d0a72f6f32cd524
Parents: 5d237b3
Author: Micah Whitacre <mk...@gmail.com>
Authored: Tue Sep 6 15:55:56 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Thu Sep 8 11:08:07 2016 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/kafka/KafkaUtils.java      | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/e8d2a69b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index aeea6fb..9065bee 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -30,12 +30,14 @@ import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.TopicMetadataResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.CrunchRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -190,6 +192,10 @@ public class KafkaUtils {
             new HashMap<>();
 
         BrokerEndPoint brokerEndPoint = partition.leader();
+        if(brokerEndPoint == null){
+          throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic()
+            +" partition:"+partition.partitionId());
+        }
         Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT);
 
         if (brokerRequests.containsKey(leader))