You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/06/01 19:09:04 UTC

[jira] [Commented] (KAFKA-5345) Some socket connections not closed after restart of Kafka Streams

    [ https://issues.apache.org/jira/browse/KAFKA-5345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033507#comment-16033507 ] 

ASF GitHub Bot commented on KAFKA-5345:
---------------------------------------

GitHub user rajinisivaram opened a pull request:

    https://github.com/apache/kafka/pull/3195

    KAFKA-5345: Close KafkaClient when streams client is closed

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rajinisivaram/kafka KAFKA-5345

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3195
    
----
commit fbdc0d0774542664f5e1a49d92539f8c5a378028
Author: Rajini Sivaram <ra...@googlemail.com>
Date:   2017-06-01T19:01:29Z

    KAFKA-5345: Close KafkaClient when streams client is closed

----


> Some socket connections not closed after restart of Kafka Streams
> -----------------------------------------------------------------
>
>                 Key: KAFKA-5345
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5345
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1
>         Environment: MacOs 10.12.5 and Ubuntu 14.04
>            Reporter: Jeroen van Wilgenburg
>            Assignee: Rajini Sivaram
>
> We ran into a problem that resulted in a "Too many open files" exception because some sockets are not closed after a restart.
> This problem only occurs with version {{0.10.2.1}} and {{0.10.2.0}}. 
> {{0.10.1.1}} and {{0.10.1.0}} both work as expected.
> I used the same version for the server and client.
> I used https://github.com/kohsuke/file-leak-detector to display the open file descriptors. The culprit was :
> {noformat}
> #146 socket channel by thread:pool-2-thread-1 on Mon May 29 11:20:47 CEST 2017
> 	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:108)
> 	at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
> 	at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
> 	at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
> 	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
> 	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
> 	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.ensureOneNodeIsReady(StreamsKafkaClient.java:195)
> 	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.getAnyReadyBrokerId(StreamsKafkaClient.java:233)
> 	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.checkBrokerCompatibility(StreamsKafkaClient.java:300)
> 	at org.apache.kafka.streams.KafkaStreams.checkBrokerVersionCompatibility(KafkaStreams.java:401)
> 	at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:425)
> {noformat}	
> 	
> 	
> I could narrow the problem down to a reproducable example below (the only dependency is 
> {{org.apache.kafka:kafka-streams:jar:0.10.2.1}}). 
> *IMPORTANT*: You have to run this code in the Intellij IDEA debugger with a special breakpoint to see it fail. 
> See the comments on the socketChannels variable on how to add this breakpoint. 
> When you run this code you will see the number of open SocketChannels increase (only on version 0.10.2.x).
> 	
> {code:title=App.java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import java.nio.channels.SocketChannel;
> import java.nio.channels.spi.AbstractInterruptibleChannel;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Properties;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> public class App {
>     private static KafkaStreams streams;
>     private static String brokerList;
>     // Fill socketChannels with entries on line 'Socket socket = socketChannel.socket();' (line number 170  on 0.10.2.1)
>     // of org.apache.kafka.common.network.Selector: Add breakpoint, right click on breakpoint.
>     // - Uncheck 'Suspend'
>     // - Check 'Evaluate and log' and fill text field with (without quotes) 'App.socketChannels.add(socketChannel)'
>     private static final List<SocketChannel> socketChannels = new ArrayList<>();
>     public static void main(String[] args) {
>         brokerList = args[0];
>         init();
>         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
>         Runnable command = () -> {
>             streams.close();
>             System.out.println("Open socketChannels: " + socketChannels.stream()
>                     .filter(AbstractInterruptibleChannel::isOpen)
>                     .collect(Collectors.toList()).size());
>             init();
>         };
>         scheduledThreadPool.scheduleWithFixedDelay(command, 10000L, 2000, TimeUnit.MILLISECONDS);
>     }
>     private static void init() {
>         Properties streamsConfiguration = new Properties();
>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JeroenApp");
>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
>         StreamsConfig config = new StreamsConfig(streamsConfiguration);
>         KStreamBuilder builder = new KStreamBuilder();
>         KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), "HarrieTopic");
>         stream.foreach((key, value) -> System.out.println(value));
>         streams = new KafkaStreams(builder, config);
>         streams.start();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)