You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Gérald Quintana (JIRA)" <ji...@apache.org> on 2016/12/06 11:11:36 UTC

[jira] [Created] (KAFKA-4491) OOME when Java client can not connect to brokers

Gérald Quintana created KAFKA-4491:
--------------------------------------

             Summary: OOME when Java client can not connect to brokers
                 Key: KAFKA-4491
                 URL: https://issues.apache.org/jira/browse/KAFKA-4491
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.0.1
         Environment: Any (Linux|Windows)
            Reporter: Gérald Quintana


Scenario: The broker cluster switched to SSL protocol but not the clients, this should have raised connection failures, but instead the client dies with OOME.

Sample code to easily reproce the problem:

{code}
public class Main {
    private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws InterruptedException {
        int threads = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++) {
            executorService.execute(new PrintConsumer("testgroup"+i, "testtopic"+i));
        }
        Thread.sleep(300000L);
        executorService.shutdown();
    }

    private static class PrintConsumer implements Runnable {
        private final String groupId;
        private final String topic;
        private final AtomicBoolean running = new AtomicBoolean(true);
        public PrintConsumer(String groupId, String topic) {
            this.groupId = groupId;
            this.topic = topic;
        }

        @Override
        public void run() {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9093,kafka2:9093");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.CLIENT_ID_CONFIG,"testclient");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //props.put("ssl.truststore.location","/opt/truststore.jks");
            //props.put("ssl.truststore.password", "localhost");
            //props.put("security.protocol", "SSL");
            while (running.get()) {
                LOGGER.info("Connecting "+topic);
                try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                    consumer.subscribe(Collections.singleton(topic));
                    while (running.get()) {
                        ConsumerRecords<String, String> records = consumer.poll(100);
                        for (ConsumerRecord<String, String> record : records)
                            LOGGER.info("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    }
                } catch (Exception e) {
                    LOGGER.warn("Exception in "+groupId, e);
                }
            }
        }

        public void stop() {
            running.set(false);
        }
    }
}
{code}
Thrown exception:
{code}
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
	at Main$PrintConsumer.run(Main.java:51)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
{code}
Increasing heap size doesn't help, the Kafka client swallows everything.
As soon as you fix configuration (uncomment SSL properties in configuration), everything is alright.
The allocation path seems to be:
{code} 
this     - value: byte[] #1
 <- hb (Java frame)     - class: java.nio.HeapByteBuffer, value: byte[] #1
  <- buffer (Java frame)     - class: org.apache.kafka.common.network.NetworkReceive, value: java.nio.HeapByteBuffer #5
   <- receive (Java frame)     - class: org.apache.kafka.common.network.KafkaChannel, value: org.apache.kafka.common.network.NetworkReceive #5
    <- attachment (Java frame)     - class: sun.nio.ch.SelectionKeyImpl, value: org.apache.kafka.common.network.KafkaChannel #5
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)