You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/09/08 16:51:28 UTC
crunch git commit: CRUNCH-617: Support defensively handling null when
partition leader cannot be found.
Repository: crunch
Updated Branches:
refs/heads/master 5d237b366 -> e8d2a69b6
CRUNCH-617: Support defensively handling null when partition leader cannot be found.
Signed-off-by: Micah Whitacre <mk...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e8d2a69b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e8d2a69b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e8d2a69b
Branch: refs/heads/master
Commit: e8d2a69b6df297f02dfe45053d0a72f6f32cd524
Parents: 5d237b3
Author: Micah Whitacre <mk...@gmail.com>
Authored: Tue Sep 6 15:55:56 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Thu Sep 8 11:08:07 2016 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/crunch/kafka/KafkaUtils.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/e8d2a69b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index aeea6fb..9065bee 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -30,12 +30,14 @@ import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -190,6 +192,10 @@ public class KafkaUtils {
new HashMap<>();
BrokerEndPoint brokerEndPoint = partition.leader();
+ if(brokerEndPoint == null){
+ throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic()
+ +" partition:"+partition.partitionId());
+ }
Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT);
if (brokerRequests.containsKey(leader))