You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2020/05/28 03:43:39 UTC

NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Hello All,

I am using Stream DSL API just to create a GlobalKTable backed by a topic.
The topology is simple, just create a global table from a topic and that's
it (pasted below code snippet), when I run this service on K8S cluster
(container in a pod), the service gets OutOfMemoryError during
kafkaStreams.start() method call (exception trace pasted below). Note that
the topic is newly created so there is no data in the topic. POD memory was
set initially to 500MiB which I doubled to 1000MiB but no luck.
kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
version ahead I think 2.4 but that should not be an issue. Any help would
be appreciated since I am blocked at this point.

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
StreamsBuilder streamsBuilder = new StreamsBuilder();
GlobalKTable<String, Map<String, String>> groupCacheTable =
    streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
Materialized.as(GROUP_CACHE_STORE_NAME));
Topology groupCacheTopology = streamsBuilder.build();
kafkaStreams = new KafkaStreams(groupCacheTopology, props);
kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping the stream");
kafkaStreams.close();
}));

{"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
[DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition from
CREATED to
REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
{"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
exception in thread 'kafka-admin-client-thread |
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
|
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\n"}
{"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
exception in thread 'kafka-producer-network-thread |
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
|
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\n"}
{"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
store group-cache-store in regular
mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
{"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
[DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
Restoring state for global store
group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for the update Pushkar! I'd have to say it is indeed very very
misleading error message and we should fix it asap. Will follow-up on the
ticket.

Guozhang

On Thu, May 28, 2020 at 9:17 AM John Roesler <vv...@apache.org> wrote:

> Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel
> free to
> do the same.
>
> Thanks,
> -John
>
> On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote:
> > Thanks for the help Guozhang!
> > however i realized that the exception and actual problem is totally
> > different. The problem was the client was not set with SSL truststore
> while
> > server is SSLenabled.
> > I also found this open bug on kafka
> > https://issues.apache.org/jira/browse/KAFKA-4493
> > After setting the SSL properties on stream, I am able to get it up and
> > running.
> >
> > @kafka developers, I think the problem is very misleading and should be
> > fixed as soon as possible, or a proper exception should be thrown.
> >
> > On Thu, May 28, 2020 at 9:46 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Pushkar,
> > >
> > > I think the memory pressure may not come from the topic data
> consumption,
> > > but from rocksDB used for materializing the global table. Note rocksDB
> > > allocates large chunk of memory beforehand in mem-table / page cache /
> > > reader cache with default configs. You can get some detailed
> information
> > > from this KIP:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, May 27, 2020 at 8:44 PM Pushkar Deole <pd...@gmail.com>
> > > wrote:
> > >
> > > > Hello All,
> > > >
> > > > I am using Stream DSL API just to create a GlobalKTable backed by a
> > > topic.
> > > > The topology is simple, just create a global table from a topic and
> > > that's
> > > > it (pasted below code snippet), when I run this service on K8S
> cluster
> > > > (container in a pod), the service gets OutOfMemoryError during
> > > > kafkaStreams.start() method call (exception trace pasted below). Note
> > > that
> > > > the topic is newly created so there is no data in the topic. POD
> memory
> > > was
> > > > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > > > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might
> be a
> > > > version ahead I think 2.4 but that should not be an issue. Any help
> would
> > > > be appreciated since I am blocked at this point.
> > > >
> > > > Properties props = new Properties();
> > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> DEFAULT_APPLICATION_ID);
> > > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > > > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > > > GlobalKTable<String, Map<String, String>> groupCacheTable =
> > > >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > > > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > > > Materialized.as(GROUP_CACHE_STORE_NAME));
> > > > Topology groupCacheTopology = streamsBuilder.build();
> > > > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > > > kafkaStreams.start();
> > > >
> > > > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > > > LOG.info("Stopping the stream");
> > > > kafkaStreams.close();
> > > > }));
> > > >
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > > > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State
> transition
> > > from
> > > > CREATED to
> > > >
> > > >
> > >
> REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > > > exception in thread 'kafka-admin-client-thread |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > > > |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > > > Java heap space\n\tat
> java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > > Source)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > >
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > >
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> > > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> > > > exception in thread 'kafka-producer-network-thread |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
> > > > |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > > > Java heap space\n\tat
> java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > > Source)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > >
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > >
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
> > > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
> > > > store group-cache-store in regular
> > > >
> > > >
> > >
> mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
> > > >
> [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
> > > > Restoring state for global store
> > > >
> > > >
> > >
> group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Posted by John Roesler <vv...@apache.org>.
Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel free to
do the same.

Thanks,
-John

On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote:
> Thanks for the help Guozhang!
> however i realized that the exception and actual problem is totally
> different. The problem was the client was not set with SSL truststore while
> server is SSLenabled.
> I also found this open bug on kafka
> https://issues.apache.org/jira/browse/KAFKA-4493
> After setting the SSL properties on stream, I am able to get it up and
> running.
> 
> @kafka developers, I think the problem is very misleading and should be
> fixed as soon as possible, or a proper exception should be thrown.
> 
> On Thu, May 28, 2020 at 9:46 AM Guozhang Wang <wa...@gmail.com> wrote:
> 
> > Hello Pushkar,
> >
> > I think the memory pressure may not come from the topic data consumption,
> > but from rocksDB used for materializing the global table. Note rocksDB
> > allocates large chunk of memory beforehand in mem-table / page cache /
> > reader cache with default configs. You can get some detailed information
> > from this KIP:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
> >
> >
> > Guozhang
> >
> >
> > On Wed, May 27, 2020 at 8:44 PM Pushkar Deole <pd...@gmail.com>
> > wrote:
> >
> > > Hello All,
> > >
> > > I am using Stream DSL API just to create a GlobalKTable backed by a
> > topic.
> > > The topology is simple, just create a global table from a topic and
> > that's
> > > it (pasted below code snippet), when I run this service on K8S cluster
> > > (container in a pod), the service gets OutOfMemoryError during
> > > kafkaStreams.start() method call (exception trace pasted below). Note
> > that
> > > the topic is newly created so there is no data in the topic. POD memory
> > was
> > > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
> > > version ahead I think 2.4 but that should not be an issue. Any help would
> > > be appreciated since I am blocked at this point.
> > >
> > > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > > GlobalKTable<String, Map<String, String>> groupCacheTable =
> > >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > > Materialized.as(GROUP_CACHE_STORE_NAME));
> > > Topology groupCacheTopology = streamsBuilder.build();
> > > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > > kafkaStreams.start();
> > >
> > > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > > LOG.info("Stopping the stream");
> > > kafkaStreams.close();
> > > }));
> > >
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition
> > from
> > > CREATED to
> > >
> > >
> > REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > > exception in thread 'kafka-admin-client-thread |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > > |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > Source)\n\tat
> > >
> > >
> > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > >
> > >
> > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> > > exception in thread 'kafka-producer-network-thread |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
> > > |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > Source)\n\tat
> > >
> > >
> > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > >
> > >
> > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
> > >
> > >
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
> > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
> > > store group-cache-store in regular
> > >
> > >
> > mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
> > > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
> > > Restoring state for global store
> > >
> > >
> > group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks for the help Guozhang!
however i realized that the exception and actual problem is totally
different. The problem was the client was not set with SSL truststore while
server is SSLenabled.
I also found this open bug on kafka
https://issues.apache.org/jira/browse/KAFKA-4493
After setting the SSL properties on stream, I am able to get it up and
running.

@kafka developers, I think the problem is very misleading and should be
fixed as soon as possible, or a proper exception should be thrown.

On Thu, May 28, 2020 at 9:46 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Pushkar,
>
> I think the memory pressure may not come from the topic data consumption,
> but from rocksDB used for materializing the global table. Note rocksDB
> allocates large chunk of memory beforehand in mem-table / page cache /
> reader cache with default configs. You can get some detailed information
> from this KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
>
>
> Guozhang
>
>
> On Wed, May 27, 2020 at 8:44 PM Pushkar Deole <pd...@gmail.com>
> wrote:
>
> > Hello All,
> >
> > I am using Stream DSL API just to create a GlobalKTable backed by a
> topic.
> > The topology is simple, just create a global table from a topic and
> that's
> > it (pasted below code snippet), when I run this service on K8S cluster
> > (container in a pod), the service gets OutOfMemoryError during
> > kafkaStreams.start() method call (exception trace pasted below). Note
> that
> > the topic is newly created so there is no data in the topic. POD memory
> was
> > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
> > version ahead I think 2.4 but that should not be an issue. Any help would
> > be appreciated since I am blocked at this point.
> >
> > Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > GlobalKTable<String, Map<String, String>> groupCacheTable =
> >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > Materialized.as(GROUP_CACHE_STORE_NAME));
> > Topology groupCacheTopology = streamsBuilder.build();
> > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > kafkaStreams.start();
> >
> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > LOG.info("Stopping the stream");
> > kafkaStreams.close();
> > }));
> >
> >
> >
> {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition
> from
> > CREATED to
> >
> >
> REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
> >
> >
> {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > exception in thread 'kafka-admin-client-thread |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> Source)\n\tat
> >
> >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> >
> >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> >
> >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> >
> >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> >
> >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> >
> >
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> > java.base/java.lang.Thread.run(Unknown Source)\n"}
> >
> >
> {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> > exception in thread 'kafka-producer-network-thread |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
> > |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> Source)\n\tat
> >
> >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> >
> >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> >
> >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> >
> >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> >
> >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> >
> >
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
> >
> >
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
> > java.base/java.lang.Thread.run(Unknown Source)\n"}
> >
> >
> {"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
> > store group-cache-store in regular
> >
> >
> mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> >
> >
> {"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
> > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
> > Restoring state for global store
> >
> >
> group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> >
>
>
> --
> -- Guozhang
>

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Pushkar,

I think the memory pressure may not come from the topic data consumption,
but from rocksDB used for materializing the global table. Note rocksDB
allocates large chunk of memory beforehand in mem-table / page cache /
reader cache with default configs. You can get some detailed information
from this KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB


Guozhang


On Wed, May 27, 2020 at 8:44 PM Pushkar Deole <pd...@gmail.com> wrote:

> Hello All,
>
> I am using Stream DSL API just to create a GlobalKTable backed by a topic.
> The topology is simple, just create a global table from a topic and that's
> it (pasted below code snippet), when I run this service on K8S cluster
> (container in a pod), the service gets OutOfMemoryError during
> kafkaStreams.start() method call (exception trace pasted below). Note that
> the topic is newly created so there is no data in the topic. POD memory was
> set initially to 500MiB which I doubled to 1000MiB but no luck.
> kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
> version ahead I think 2.4 but that should not be an issue. Any help would
> be appreciated since I am blocked at this point.
>
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> StreamsBuilder streamsBuilder = new StreamsBuilder();
> GlobalKTable<String, Map<String, String>> groupCacheTable =
>     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> Materialized.as(GROUP_CACHE_STORE_NAME));
> Topology groupCacheTopology = streamsBuilder.build();
> kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> kafkaStreams.start();
>
> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> LOG.info("Stopping the stream");
> kafkaStreams.close();
> }));
>
>
> {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition from
> CREATED to
>
> REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
>
> {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> exception in thread 'kafka-admin-client-thread |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
>
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
>
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
>
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
>
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\n"}
>
> {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> exception in thread 'kafka-producer-network-thread |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
> |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
>
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
>
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
>
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\n"}
>
> {"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
> store group-cache-store in regular
>
> mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
>
> {"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
> [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
> Restoring state for global store
>
> group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
>


-- 
-- Guozhang