You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/07/07 17:01:42 UTC
[4/5] apex-malhar git commit: APEXMALHAR-2133 #resolve #comment
Handle case partitionsFor() returns null
APEXMALHAR-2133 #resolve #comment Handle case partitionsFor() returns null
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/273b2072
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/273b2072
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/273b2072
Branch: refs/heads/master
Commit: 273b20724a796786d52235e6535303f86b35800f
Parents: 32840a2
Author: brightchen <br...@datatorrent.com>
Authored: Tue Jul 5 13:39:06 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Wed Jul 6 16:50:48 2016 -0700
----------------------------------------------------------------------
.../malhar/kafka/AbstractKafkaPartitioner.java | 60 +++++++++++++-------
1 file changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/273b2072/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index c6e47e9..772399d 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -93,39 +93,46 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
- for (int i = 0; i < clusters.length; i++) {
- metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
- for (String topic : topics) {
- int tryTime = 3;
- while (tryTime-- > 0) {
- try {
- List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic);
- if (logger.isDebugEnabled()) {
- logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis));
+ try {
+ for (int i = 0; i < clusters.length; i++) {
+ metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
+ for (String topic : topics) {
+ //try several time if partitionsFor(topic) returns null or throws exception.
+ //partitionsFor(topic) will return null if the topic is invalid or hasn't completed
+ int tryTime = 10;
+ while (tryTime-- > 0) {
+ try {
+ List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic);
+ if (ptis != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis));
+ }
+ metadata.get(clusters[i]).put(topic, ptis);
+ break;
+ }
+
+ logger.warn("Partition metadata for topic {} is null. retrying...", topic);
+
+ } catch (Exception e) {
+ logger.warn("Got Exception when trying get partition info for topic {}.", topic, e);
}
- metadata.get(clusters[i]).put(topic, ptis);
- break;
- } catch (AuthorizationException ae) {
- logger.error("Kafka AuthorizationException.");
- throw new RuntimeException("Kafka AuthorizationException.", ae);
- } catch (Exception e) {
- logger.warn("Got Exception when trying get partition info for topic {}.", topic, e);
+
try {
Thread.sleep(100);
} catch (Exception e1) {
//ignore
}
+ } //end while
+
+ if (tryTime == 0) {
+ throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
}
}
- if (tryTime == 0) {
- throw new RuntimeException("Get partition info completely failed. Please check the log file");
- }
}
- metadataRefreshClients.get(i).close();
+ } finally {
+ closeClients();
}
- metadataRefreshClients = null;
-
List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null;
try {
parts = assign(metadata);
@@ -169,6 +176,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
}
}
+ protected void closeClients()
+ {
+ for (KafkaConsumer<byte[], byte[]> consume : metadataRefreshClients) {
+ consume.close();
+ }
+ metadataRefreshClients = null;
+ }
+
+
@Override
public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
{