You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Prakash Y <yp...@gmail.com> on 2019/09/11 11:15:01 UTC

Facing memory issues with Kafka streams application

Hi,

I am using the hopping time window in Kafka streams with suppress option. I
am facing memory issues when the Kafka application is running continuously
for 2 to 3 days of deployment without any restart on the machine.

We are using hopping window and buffer size as 1GB.

We are getting there exceptions very frequently, and stream processing is
getting failed because of these exceptions. Please help us in resolving
this issue.

Kafka Version: 2.1.0

I am adding the POC code below

KTable<Windowed<String>, OurObject> cdstream = source
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.aggregate(() -> new OurObject(),  //initializer
(aggKey, newValue, aggValue) -> {
aggValue.addNew(newValue);
return aggValue;
},  //adder
Materialized.as("windowed_aggregated_store"))  //state store name
.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(1024 * 1024 *
1024).shutDownWhenFull()));


The log file contents are:


Successfully started cd stream listner ....
MBR-0x01:Consumer Consumer_metric_data-cd_metric_worker_2adc3_0 is ready,
Consuming the messages...MBR-0x01:Consumer
Consumer_metric_data-cd_metric_worker_2adc3_1 is ready, Consuming the
messages...MBR-0x01:Consumer
Consumer_cache-key-delete-queue-cache_key_delete_2adc3_2adc3-vjp43_0 is
ready, Consuming the messages...
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "kafka-producer-network-thread |
producer-3"
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2"
java.lang.OutOfMemoryError: Java heap space
at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown
Source)
at
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
at
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8"
java.lang.OutOfMemoryError: Java heap space
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7"
java.lang.OutOfMemoryError: Java heap space
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
at
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10"
java.lang.OutOfMemoryError: Java heap space
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4"
java.lang.OutOfMemoryError: Java heap space
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9"
java.lang.OutOfMemoryError: Java heap space
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6"
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
Caused by: java.lang.OutOfMemoryError: Java heap space
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5"
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
Caused by: java.lang.OutOfMemoryError: Java heap space
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3"
java.lang.OutOfMemoryError: Java heap space
Exception in thread
"cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1"
java.lang.OutOfMemoryError: Java heap space
-----

Thanks,
Prakash.