You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/04 19:58:23 UTC

[5/9] nifi git commit: NIFI-1684 This closes #308. fixed ZKClient connection leak

NIFI-1684 This closes #308. fixed ZKClient connection leak

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/552d8318
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/552d8318
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/552d8318

Branch: refs/heads/support/nifi-0.6.x
Commit: 552d831807efc06b714759db9ecb71cc41142826
Parents: 89567eb
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Mar 28 21:48:37 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Apr 4 13:52:58 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/GetKafka.java  | 13 +++---
 .../nifi/processors/kafka/KafkaUtils.java       | 43 ++++++++++++++------
 2 files changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/552d8318/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index e06befb..7660305 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -256,13 +256,14 @@ public class GetKafka extends AbstractProcessor {
             props.setProperty("consumer.timeout.ms", "1");
         }
 
+        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
+                context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
+
         final ConsumerConfig consumerConfig = new ConsumerConfig(props);
         consumer = Consumer.createJavaConsumerConnector(consumerConfig);
 
         final Map<String, Integer> topicCountMap = new HashMap<>(1);
 
-        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
-
         int concurrentTaskToUse = context.getMaxConcurrentTasks();
         if (context.getMaxConcurrentTasks() < partitionCount){
             this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
@@ -346,14 +347,14 @@ public class GetKafka extends AbstractProcessor {
                 try {
                     f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
-                    this.consumerStreamsReady.set(false);
+                    shutdownConsumer();
                     f.cancel(true);
                     Thread.currentThread().interrupt();
                     getLogger().warn("Interrupted while waiting to get connection", e);
                 } catch (ExecutionException e) {
                     throw new IllegalStateException(e);
                 } catch (TimeoutException e) {
-                    this.consumerStreamsReady.set(false);
+                    shutdownConsumer();
                     f.cancel(true);
                     getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e);
                 }
@@ -374,14 +375,14 @@ public class GetKafka extends AbstractProcessor {
             try {
                 consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
-                this.consumerStreamsReady.set(false);
+                shutdownConsumer();
                 consumptionFuture.cancel(true);
                 Thread.currentThread().interrupt();
                 getLogger().warn("Interrupted while consuming messages", e);
             } catch (ExecutionException e) {
                 throw new IllegalStateException(e);
             } catch (TimeoutException e) {
-                this.consumerStreamsReady.set(false);
+                shutdownConsumer();
                 consumptionFuture.cancel(true);
                 getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/552d8318/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
index a725c2b..8ddea61 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -33,25 +33,42 @@ import scala.collection.JavaConversions;
  */
 class KafkaUtils {
 
+
     /**
      * Will retrieve the amount of partitions for a given Kafka topic.
      */
     static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) {
-        ZkClient zkClient = new ZkClient(zookeeperConnectionString);
+        ZkClient zkClient = null;
 
-        zkClient.setZkSerializer(new ZkSerializer() {
-            @Override
-            public byte[] serialize(Object o) throws ZkMarshallingError {
-                return ZKStringSerializer.serialize(o);
-            }
+        try {
+            zkClient = new ZkClient(zookeeperConnectionString);
+            zkClient.setZkSerializer(new ZkSerializer() {
+                @Override
+                public byte[] serialize(Object o) throws ZkMarshallingError {
+                    return ZKStringSerializer.serialize(o);
+                }
 
-            @Override
-            public Object deserialize(byte[] bytes) throws ZkMarshallingError {
-                return ZKStringSerializer.deserialize(bytes);
+                @Override
+                public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+                    return ZKStringSerializer.deserialize(bytes);
+                }
+            });
+            scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
+                    .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
+            if (topicMetadatas != null && topicMetadatas.size() > 0) {
+                return JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size();
+            } else {
+                throw new IllegalStateException("Failed to get metadata for topic " + topicName);
             }
-        });
-        scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
-                .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
-        return topicMetadatas.size();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to retrieve partitions for topic " + topicName, e);
+        } finally {
+            try {
+                zkClient.close();
+            } catch (Exception e2) {
+                // ignore
+            }
+        }
     }
+
 }