You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/04 19:58:23 UTC
[5/9] nifi git commit: NIFI-1684 This closes #308. fixed ZKClient
connection leak
NIFI-1684 This closes #308. fixed ZKClient connection leak
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/552d8318
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/552d8318
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/552d8318
Branch: refs/heads/support/nifi-0.6.x
Commit: 552d831807efc06b714759db9ecb71cc41142826
Parents: 89567eb
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Mar 28 21:48:37 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Apr 4 13:52:58 2016 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/kafka/GetKafka.java | 13 +++---
.../nifi/processors/kafka/KafkaUtils.java | 43 ++++++++++++++------
2 files changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/552d8318/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index e06befb..7660305 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -256,13 +256,14 @@ public class GetKafka extends AbstractProcessor {
props.setProperty("consumer.timeout.ms", "1");
}
+ int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
+ context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
+
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
final Map<String, Integer> topicCountMap = new HashMap<>(1);
- int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
-
int concurrentTaskToUse = context.getMaxConcurrentTasks();
if (context.getMaxConcurrentTasks() < partitionCount){
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
@@ -346,14 +347,14 @@ public class GetKafka extends AbstractProcessor {
try {
f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- this.consumerStreamsReady.set(false);
+ shutdownConsumer();
f.cancel(true);
Thread.currentThread().interrupt();
getLogger().warn("Interrupted while waiting to get connection", e);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
- this.consumerStreamsReady.set(false);
+ shutdownConsumer();
f.cancel(true);
getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e);
}
@@ -374,14 +375,14 @@ public class GetKafka extends AbstractProcessor {
try {
consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- this.consumerStreamsReady.set(false);
+ shutdownConsumer();
consumptionFuture.cancel(true);
Thread.currentThread().interrupt();
getLogger().warn("Interrupted while consuming messages", e);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
- this.consumerStreamsReady.set(false);
+ shutdownConsumer();
consumptionFuture.cancel(true);
getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/552d8318/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
index a725c2b..8ddea61 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -33,25 +33,42 @@ import scala.collection.JavaConversions;
*/
class KafkaUtils {
+
/**
* Will retrieve the amount of partitions for a given Kafka topic.
*/
static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) {
- ZkClient zkClient = new ZkClient(zookeeperConnectionString);
+ ZkClient zkClient = null;
- zkClient.setZkSerializer(new ZkSerializer() {
- @Override
- public byte[] serialize(Object o) throws ZkMarshallingError {
- return ZKStringSerializer.serialize(o);
- }
+ try {
+ zkClient = new ZkClient(zookeeperConnectionString);
+ zkClient.setZkSerializer(new ZkSerializer() {
+ @Override
+ public byte[] serialize(Object o) throws ZkMarshallingError {
+ return ZKStringSerializer.serialize(o);
+ }
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- return ZKStringSerializer.deserialize(bytes);
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ return ZKStringSerializer.deserialize(bytes);
+ }
+ });
+ scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
+ .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
+ if (topicMetadatas != null && topicMetadatas.size() > 0) {
+ return JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size();
+ } else {
+ throw new IllegalStateException("Failed to get metadata for topic " + topicName);
}
- });
- scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
- .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
- return topicMetadatas.size();
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to retrieve partitions for topic " + topicName, e);
+ } finally {
+ try {
+ zkClient.close();
+ } catch (Exception e2) {
+ // ignore
+ }
+ }
}
+
}