You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/07/07 17:01:42 UTC

[4/5] apex-malhar git commit: APEXMALHAR-2133 #resolve #comment Handle case partitionsFor() returns null

APEXMALHAR-2133 #resolve #comment Handle case partitionsFor() returns null


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/273b2072
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/273b2072
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/273b2072

Branch: refs/heads/master
Commit: 273b20724a796786d52235e6535303f86b35800f
Parents: 32840a2
Author: brightchen <br...@datatorrent.com>
Authored: Tue Jul 5 13:39:06 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Wed Jul 6 16:50:48 2016 -0700

----------------------------------------------------------------------
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 60 +++++++++++++-------
 1 file changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/273b2072/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index c6e47e9..772399d 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -93,39 +93,46 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
 
     Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
 
-    for (int i = 0; i < clusters.length; i++) {
-      metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
-      for (String topic : topics) {
-        int tryTime = 3;
-        while (tryTime-- > 0) {
-          try {
-            List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic);
-            if (logger.isDebugEnabled()) {
-              logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis));
+    try {
+      for (int i = 0; i < clusters.length; i++) {
+        metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
+        for (String topic : topics) {
+          //try several time if partitionsFor(topic) returns null or throws exception.
+          //partitionsFor(topic) will return null if the topic is invalid or hasn't completed
+          int tryTime = 10;
+          while (tryTime-- > 0) {
+            try {
+              List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic);
+              if (ptis != null) {
+                if (logger.isDebugEnabled()) {
+                  logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis));
+                }
+                metadata.get(clusters[i]).put(topic, ptis);
+                break;
+              }
+              
+              logger.warn("Partition metadata for topic {} is null. retrying...", topic);
+              
+            } catch (Exception e) {
+              logger.warn("Got Exception when trying get partition info for topic {}.", topic, e);
             }
-            metadata.get(clusters[i]).put(topic, ptis);
-            break;
-          } catch (AuthorizationException ae) {
-            logger.error("Kafka AuthorizationException.");
-            throw new RuntimeException("Kafka AuthorizationException.", ae);
-          } catch (Exception e) {
-            logger.warn("Got Exception when trying get partition info for topic {}.", topic, e);
+  
             try {
               Thread.sleep(100);
             } catch (Exception e1) {
               //ignore
             }
+          } //end while
+          
+          if (tryTime == 0) {
+            throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
           }
         }
-        if (tryTime == 0) {
-          throw new RuntimeException("Get partition info completely failed. Please check the log file");
-        }
       }
-      metadataRefreshClients.get(i).close();
+    } finally {
+      closeClients();
     }
 
-    metadataRefreshClients = null;
-
     List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null;
     try {
       parts = assign(metadata);
@@ -169,6 +176,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
     }
   }
 
+  protected void closeClients()
+  {
+    for (KafkaConsumer<byte[], byte[]> consume : metadataRefreshClients) {
+      consume.close();
+    }
+    metadataRefreshClients = null;
+  }
+  
+  
   @Override
   public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
   {