You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/04/24 07:43:04 UTC
[jira] [Commented] (FLINK-6366) KafkaConsumer is not closed in
FlinkKafkaConsumer09
[ https://issues.apache.org/jira/browse/FLINK-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15980815#comment-15980815 ]
ASF GitHub Bot commented on FLINK-6366:
---------------------------------------
GitHub user fanyon opened a pull request:
https://github.com/apache/flink/pull/3759
[FLINK-6366] close KafkaConsumer in FlinkKafkaConsumer09 after getKaf…
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/fanyon/flink FLINK-6366
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3759.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3759
----
commit cb0624f98290af800f50286d95823a8ec0efcf10
Author: mengji.fy <me...@taobao.com>
Date: 2017-04-24T07:17:54Z
[FLINK-6366] close KafkaConsumer in FlinkKafkaConsumer09 after getKafkaPartitions
----
> KafkaConsumer is not closed in FlinkKafkaConsumer09
> ---------------------------------------------------
>
> Key: FLINK-6366
> URL: https://issues.apache.org/jira/browse/FLINK-6366
> Project: Flink
> Issue Type: Bug
> Reporter: Fang Yong
> Assignee: Fang Yong
>
> In getKafkaPartitions of FlinkKafkaConsumer09, the KafkaConsumer is created as flowers and will not be closed.
> {code:title=FlinkKafkaConsumer09.java|borderStyle=solid}
> protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
> // read the partitions that belong to the listed topics
> final List<KafkaTopicPartition> partitions = new ArrayList<>();
> try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) {
> for (final String topic: topics) {
> // get partitions for each topic
> List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
> ...
> }
> }
> ...
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)