You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Miguel González <mi...@klar.mx> on 2022/01/10 18:04:43 UTC

Kafka Streams - Stream threads processing two input topics

Hello

We are consuming two topics (A and B) and joining them, but I have noticed
no matter what I do, topic A gets consumed first in a batch and then topic
B , increasing *num.stream.threads* will only get topic A process a lot of
records faster. Topic B has lots of messages compared to Topic A


Here are my settings:

Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, streamingAppName);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
config.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");
config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);


For now we tried splitting the topology into two topologies to have one
thread for each of the topics that somewhat has alleviated the problem,

For the first topology with the topic A we have updated the settings like
so to try to limit the amount of messages fetched, and since we do a
repartition the second topology will join with that repartition topic

properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
streamingAppName + "-reKey-app");
properties.setProperty(consumerPrefix(MAX_POLL_RECORDS_CONFIG), "20");
properties.setProperty(consumerPrefix(FETCH_MAX_BYTES_CONFIG),
Integer.toString(30 * 1024));



Is there a better way?

I was encouraged to use *max.task.idle.ms <http://max.task.idle.ms>* to
avoid one topic's processing going much faster than the other, but I'm not
sure if that will help with my issue.


I'm not really sure about StreamThreads and Tasks and how they work.

Any help is appreciated.

Re: Kafka Streams - Stream threads processing two input topics

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Miguel,

I suspect it's due to the timestamps in your topic A, which are earlier
than topic B. Note that Kafka Streams tries to synchronize joining topics
by processing records with smaller timestamps, and hence if topic A's
messages have smaller timestamps, they will be selected over the other.

The reason why through a repartition topic alleviates the problem is that,
the first topology would reset the timestamp on the repartition topics, to
some value more close to the processing time and is closer to topic B's
messages' timestamps.


Guozhang

On Mon, Jan 10, 2022 at 10:05 AM Miguel González <mi...@klar.mx>
wrote:

> Hello
>
> We are consuming two topics (A and B) and joining them, but I have noticed
> no matter what I do, topic A gets consumed first in a batch and then topic
> B , increasing *num.stream.threads* will only get topic A process a lot of
> records faster. Topic B has lots of messages compared to Topic A
>
>
> Here are my settings:
>
> Map<String, Object> config = new HashMap<>();
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, streamingAppName);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.StringSerde.class);
> config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.EXACTLY_ONCE);
> config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
> config.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");
> config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
>
>
> For now we tried splitting the topology into two topologies to have one
> thread for each of the topics that somewhat has alleviated the problem,
>
> For the first topology with the topic A we have updated the settings like
> so to try to limit the amount of messages fetched, and since we do a
> repartition the second topology will join with that repartition topic
>
> properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
> streamingAppName + "-reKey-app");
> properties.setProperty(consumerPrefix(MAX_POLL_RECORDS_CONFIG), "20");
> properties.setProperty(consumerPrefix(FETCH_MAX_BYTES_CONFIG),
> Integer.toString(30 * 1024));
>
>
>
> Is there a better way?
>
> I was encouraged to use *max.task.idle.ms <http://max.task.idle.ms>* to
> avoid one topic's processing going much faster than the other, but I'm not
> sure if that will help with my issue.
>
>
> I'm not really sure about StreamThreads and Tasks and how they work.
>
> Any help is appreciated.
>


-- 
-- Guozhang