You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jeroen van Wilgenburg (JIRA)" <ji...@apache.org> on 2017/05/29 13:35:04 UTC

[jira] [Created] (KAFKA-5345) Some socket connections not closed after restart of Kafka Streams

Jeroen van Wilgenburg created KAFKA-5345:
--------------------------------------------

             Summary: Some socket connections not closed after restart of Kafka Streams
                 Key: KAFKA-5345
                 URL: https://issues.apache.org/jira/browse/KAFKA-5345
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.1, 0.10.2.0
         Environment: MacOs 10.12.5 and Ubuntu 14.04
            Reporter: Jeroen van Wilgenburg


We ran into a problem that resulted in a "Too many open files" exception because some sockets are not closed after a restart.
This problem only occurs with version {{0.10.2.1}} and {{0.10.2.0}}. 
{{0.10.1.1}} and {{0.10.1.0}} both work as expected.
I used the same version for the server and client.

I used https://github.com/kohsuke/file-leak-detector to display the open file descriptors. The culprit was :

{noformat}
#146 socket channel by thread:pool-2-thread-1 on Mon May 29 11:20:47 CEST 2017
	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:108)
	at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
	at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.ensureOneNodeIsReady(StreamsKafkaClient.java:195)
	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.getAnyReadyBrokerId(StreamsKafkaClient.java:233)
	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.checkBrokerCompatibility(StreamsKafkaClient.java:300)
	at org.apache.kafka.streams.KafkaStreams.checkBrokerVersionCompatibility(KafkaStreams.java:401)
	at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:425)
{noformat}	
	
	
I could narrow the problem down to a reproducable example below (the only dependency is 
{{org.apache.kafka:kafka-streams:jar:0.10.2.1}}). 
*IMPORTANT*: You have to run this code in the Intellij IDEA debugger with a special breakpoint to see it fail. 
See the comments on the socketChannels variable on how to add this breakpoint. 
When you run this code you will see the number of open SocketChannels increase (only on version 0.10.2.x).
	
{code:title=App.java}
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class App {

    private static KafkaStreams streams;
    private static String brokerList;

    // Fill socketChannels with entries on line 'Socket socket = socketChannel.socket();' (line number 170  on 0.10.2.1)
    // of org.apache.kafka.common.network.Selector: Add breakpoint, right click on breakpoint.
    // - Uncheck 'Suspend'
    // - Check 'Evaluate and log' and fill text field with (without quotes) 'App.socketChannels.add(socketChannel)'
    private static final List<SocketChannel> socketChannels = new ArrayList<>();

    public static void main(String[] args) {
        brokerList = args[0];
        init();

        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);

        Runnable command = () -> {
            streams.close();

            System.out.println("Open socketChannels: " + socketChannels.stream()
                    .filter(AbstractInterruptibleChannel::isOpen)
                    .collect(Collectors.toList()).size());

            init();
        };
        scheduledThreadPool.scheduleWithFixedDelay(command, 10000L, 2000, TimeUnit.MILLISECONDS);

    }

    private static void init() {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JeroenApp");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        StreamsConfig config = new StreamsConfig(streamsConfiguration);

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), "HarrieTopic");
        stream.foreach((key, value) -> System.out.println(value));

        streams = new KafkaStreams(builder, config);
        streams.start();
    }

}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)