You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Prakash Y <yp...@gmail.com> on 2019/09/11 07:13:42 UTC

Re: Facing memory issues with kafka streams application

We are allocating 1GB as like below,

KTable<Windowed<String>, OurObject> windowedStream = source
.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));

and we are frequently getting below errors. Please help us in resolving this issue.

Thanks,
Prakash.

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3"
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-2" java.lang.OutOfMemoryError: Java heap space
	at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
	at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$389/0x0000000800bd2840.restoreBatch(Unknown Source)
	at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
	at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-8" java.lang.OutOfMemoryError: Java heap space
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-7" java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)
	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-10" java.lang.OutOfMemoryError: Java heap space
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-4" java.lang.OutOfMemoryError: Java heap space
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-9" java.lang.OutOfMemoryError: Java heap space
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-6" java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
Caused by: java.lang.OutOfMemoryError: Java heap space
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-5" java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1069)
Caused by: java.lang.OutOfMemoryError: Java heap space
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-3" java.lang.OutOfMemoryError: Java heap space
Exception in thread "cd_stream-321c072d-34de-4266-9cf2-01c67de5cf5b-StreamThread-1" java.lang.OutOfMemoryError: Java heap space


On 2019/07/11 20:25:32, Bill Bejeck <bi...@confluent.io> wrote: 
> Thanks for reporting this Kalyani, we'll take a look.
> By chance can provide log files?
> 
> Thanks,
> Bill
> 
> On Mon, Jul 8, 2019 at 7:43 AM kalyani yarlagadda <
> kalyani.yarlagadda1@gmail.com> wrote:
> 
> > Hi,
> >
> > I need assistance in the below scenario. Please help me with this.
> >
> > I am using the hopping time window in Kafka streams with *suppress*() I am
> > seeing the following memory Errors.
> >
> > *1. Facing the memory issue when the Kafka application is running
> > continuously* for 2 to 3 days of deployment without any restart on the
> > machine
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *Exception in thread
> >
> > "change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2"
> > java.lang.OutOfMemoryError: Java heap space        at
> >
> > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
> >       at
> >
> > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x0000000800bc7440.restoreBatch(Unknown
> > Source)        at
> >
> > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
> >       at
> >
> > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
> >       at
> >
> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
> >       at
> >
> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
> >       at
> >
> > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
> >       at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
> >       at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> >       at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)*
> >
> >
> > we are having the following Specifications in the machine:
> > RAM: 16GB
> >
> > *2.   /tmp Folder is filled with more memory also.*
> >
> >
> > *Kafka Version:* *2.1.0*
> >
> > *I am adding the POC code below*
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *// define the time window as a hopping time windowTimeWindows timeWindow =
> >
> > TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable<Windowed<String>,
> > MetricsTimeSeries> windowedMetricsTimeSeriesStream =
> > builder.stream("metrics_ip", Consumed.with(Serdes.String(), new
> > JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new
> > MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries*  is the
> > aggregator class
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return
> > aggValue;}, /* adder
> > */Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
> > state store name
> >
> > */.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key,
> > value) -> //mapping logic goes here ).foreach(//logic to validate and
> > save);*
> >
> > *Properties set to Kafka Streams:*
> >
> >
> >
> >
> >
> > *StreamsConfig.APPLICATION_ID_CONFIG -
> >
> > "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
> > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
> >
> > Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
> > - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
> > JSONSerde.class*
> >
> > *StreamsConfig.NUM_STREAM_THREADS_CONFIG -  2*
> >
> >
> > *StreamsConfig.PROCESSING_GUARANTEE_CONFIG -
> > StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms*
> >
> >
> >
> > Thanks in Advance.
> >
> > Kalyani Y,
> > 9177982636
> >
>