You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/06/01 19:09:04 UTC
[jira] [Commented] (KAFKA-5345) Some socket connections not closed
after restart of Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-5345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033507#comment-16033507 ]
ASF GitHub Bot commented on KAFKA-5345:
---------------------------------------
GitHub user rajinisivaram opened a pull request:
https://github.com/apache/kafka/pull/3195
KAFKA-5345: Close KafkaClient when streams client is closed
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rajinisivaram/kafka KAFKA-5345
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/3195.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3195
----
commit fbdc0d0774542664f5e1a49d92539f8c5a378028
Author: Rajini Sivaram <ra...@googlemail.com>
Date: 2017-06-01T19:01:29Z
KAFKA-5345: Close KafkaClient when streams client is closed
----
> Some socket connections not closed after restart of Kafka Streams
> -----------------------------------------------------------------
>
> Key: KAFKA-5345
> URL: https://issues.apache.org/jira/browse/KAFKA-5345
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: MacOs 10.12.5 and Ubuntu 14.04
> Reporter: Jeroen van Wilgenburg
> Assignee: Rajini Sivaram
>
> We ran into a problem that resulted in a "Too many open files" exception because some sockets are not closed after a restart.
> This problem only occurs with version {{0.10.2.1}} and {{0.10.2.0}}.
> {{0.10.1.1}} and {{0.10.1.0}} both work as expected.
> I used the same version for the server and client.
> I used https://github.com/kohsuke/file-leak-detector to display the open file descriptors. The culprit was :
> {noformat}
> #146 socket channel by thread:pool-2-thread-1 on Mon May 29 11:20:47 CEST 2017
> at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:108)
> at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
> at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
> at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
> at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
> at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.ensureOneNodeIsReady(StreamsKafkaClient.java:195)
> at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.getAnyReadyBrokerId(StreamsKafkaClient.java:233)
> at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.checkBrokerCompatibility(StreamsKafkaClient.java:300)
> at org.apache.kafka.streams.KafkaStreams.checkBrokerVersionCompatibility(KafkaStreams.java:401)
> at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:425)
> {noformat}
>
>
> I could narrow the problem down to a reproducable example below (the only dependency is
> {{org.apache.kafka:kafka-streams:jar:0.10.2.1}}).
> *IMPORTANT*: You have to run this code in the Intellij IDEA debugger with a special breakpoint to see it fail.
> See the comments on the socketChannels variable on how to add this breakpoint.
> When you run this code you will see the number of open SocketChannels increase (only on version 0.10.2.x).
>
> {code:title=App.java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import java.nio.channels.SocketChannel;
> import java.nio.channels.spi.AbstractInterruptibleChannel;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Properties;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> public class App {
> private static KafkaStreams streams;
> private static String brokerList;
> // Fill socketChannels with entries on line 'Socket socket = socketChannel.socket();' (line number 170 on 0.10.2.1)
> // of org.apache.kafka.common.network.Selector: Add breakpoint, right click on breakpoint.
> // - Uncheck 'Suspend'
> // - Check 'Evaluate and log' and fill text field with (without quotes) 'App.socketChannels.add(socketChannel)'
> private static final List<SocketChannel> socketChannels = new ArrayList<>();
> public static void main(String[] args) {
> brokerList = args[0];
> init();
> ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
> Runnable command = () -> {
> streams.close();
> System.out.println("Open socketChannels: " + socketChannels.stream()
> .filter(AbstractInterruptibleChannel::isOpen)
> .collect(Collectors.toList()).size());
> init();
> };
> scheduledThreadPool.scheduleWithFixedDelay(command, 10000L, 2000, TimeUnit.MILLISECONDS);
> }
> private static void init() {
> Properties streamsConfiguration = new Properties();
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JeroenApp");
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
> StreamsConfig config = new StreamsConfig(streamsConfiguration);
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), "HarrieTopic");
> stream.foreach((key, value) -> System.out.println(value));
> streams = new KafkaStreams(builder, config);
> streams.start();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)