You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Henry Thacker <he...@henrythacker.com> on 2017/04/28 10:01:46 UTC

Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Hi,

I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
processes, Consumer 1 and 2. They both share the same application ID, but
subscribe for different single-partition topics. Only one stream consumer
receives messages.

The non working stream consumer just sits there logging:

Starting stream thread [StreamThread-1]
Discovered coordinator <Host> (Id: ...) for group my-streamer
Revoking previously assigned partitions [] for group my-streamer
(Re-)joining group my-streamer
Successfully joined group my-streamer with generation 3
Setting newly assigned partitions [] for group my-streamer
(Re-)joining group my-streamer
Successfully joined group my-streamer with generation 4

If I was trying to subscribe to the same topic & partition I could
understand this behaviour, but given that the subscriptions are for
different input topics, I would have thought this should work?

Thanks,
Henry

-- 
Henry Thacker

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Henry Thacker <he...@henrythacker.com>.
Hi Eno,

Think I've cracked it finally - was hit by two problems, firstly my listen
IP was 0.0.0.0 and clients were trying to connect to this, which obviously
wasn't going to work.

Part two was I had stupidly left some code in when I was working with the
KStreamBuilder and hadn't removed it when moving to the TopologyBuilder, so
I was trying to instantiate a KafkaStreams with no input source.

Looks like everything works now - thank you very much for your help.

Now I'm creating multiple application.ids that are essentially throwaway
and all state can be removed after 5 days, just need to work out how to
tidy this all up in a semi-automated fashion.

Thanks,
Henry

-- 
Henry Thacker

On 2 May 2017 at 16:21:33, Eno Thereska (eno.thereska@gmail.com) wrote:

> Could you make sure you don’t have a firewall or that the Kafka brokers
> are set up correctly and can be accessed? Is the SSL port the same as the
> PLAINTEXT port in your server.config file? E.g., see this:
> https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521
> <
> https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521>
>
>
> Eno
>
> On May 2, 2017, at 10:59 AM, Henry Thacker <he...@henrythacker.com>
> wrote:
>
> Hi Eno,
>
> At the moment this is hard coded, but overridable with command line
> parameters:
>
> config.put(StreamsConfig.APPLICATION_ID, appId + "-" + topic);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.Bytes().getClass().getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.Bytes().getClass().getName());
> config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> maxMessageBytes);
> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxMessageBytes);
> config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class.getName());
> config.put(StreamsConfig.STATE_DIR_CONFIG, tmpDir);
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
> config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
> config.put(ProducerConfig.RETRIES_CONFIG, 2);
>
> if (ssl)
> config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
>
> Variables:
> appId - "my-streamer-app"
> topic - "20170502_instancea_1234"
> brokers - "localhost:9092,localhost:9093,localhost:9094"
> zookeepers - "localhost:2181,localhost:2182,localhost:2183"
> maxMessageBytes - 30000000
> ssl - true
>
> Thanks,
> Henry
> --
> Henry Thacker
>
> On 2 May 2017 at 10:16:25, Eno Thereska (eno.thereska@gmail.com) wrote:
>
> Hi Henry,
>
> Could you share the streams configuration for your apps? I.e., the part
> where you assign application id and all the rest of the configs (just
> configs, not code).
>
> Thanks
> Eno
>
> On May 2, 2017, at 8:53 AM, Henry Thacker <he...@henrythacker.com> wrote:
>
> Thanks all for your replies - I have checked out the docs which were very
> helpful.
>
> I have now moved the separate topic streams to different processes each
> with their own app.id and I'm getting the following pattern, with no data
> consumed:
>
> "Starting stream thread [StreamThread-1]
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group ..
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group .."
>
> The discover and dead states repeat every few minutes.
>
> During this time, the broker logs look happy.
>
> One other, hopefully unrelated point, is this cluster is all SSL
> encrypted.
>
> Thanks,
> Henry
>
> --
> Henry Thacker
>
> On 29 April 2017 at 05:31:30, Matthias J. Sax (matthias@confluent.io)
> wrote:
>
> Henry,
>
> you might want to check out the docs, that give an overview of the
> architecture:
> http://docs.confluent.io/current/streams/architecture.html#example
>
> Also, I am wondering why your application did not crash: I would expect
> an exception like
>
> java.lang.IllegalArgumentException: Assigned partition foo-2 for
> non-subscribed topic regex pattern; subscription pattern is bar
>
> Maybe you just don't hit it, because both topics have a single partition
> and not multiple.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved
>
>
> Yes. That should happen.
>
> why when
>
> running this in two separate processes do I not observe the same?
>
>
> Not sure what you mean by this?
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
>
> Yes.
>
>
> If both your topics are single partitioned, and you want to share state,
> you will not be able to run with more then one thread in your Streams app.
>
> The only way to work around this, would be to copy the data into another
> topic with more partitions before you process them -- of course, this
> would mean data duplication.
>
>
> -Matthias
>
>
> On 4/28/17 12:45 PM, Henry Thacker wrote:
>
> Thanks Michael and Eno for your help - I always thought the unit of
> parallelism was a combination of topic & partition rather than just
> partition.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved, why when
> running this in two separate processes do I not observe the same? Just
> wanting to try and form a mental model of how this is all working - I will
> try and look through some code over the weekend.
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
> Unfortunately the data on the input topics are provided by a third party
> component which sends these keyless messages on a single partition per
> topic, so I have little ability to fix this at source :-(
>
> Thanks,
> Henry
>
>
> ------------------------------
>
>
>
>
>

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Eno Thereska <en...@gmail.com>.
Could you make sure you don’t have a firewall or that the Kafka brokers are set up correctly and can be accessed? Is the SSL port the same as the PLAINTEXT port in your server.config file? E.g., see this: https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521 <https://stackoverflow.com/questions/43534220/marking-the-coordinator-dead-for-groupkafka/43537521>

Eno
> On May 2, 2017, at 10:59 AM, Henry Thacker <he...@henrythacker.com> wrote:
> 
> Hi Eno,
> 
> At the moment this is hard coded, but overridable with command line
> parameters:
> 
> config.put(StreamsConfig.APPLICATION_ID, appId + "-" + topic);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.Bytes().getClass().getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.Bytes().getClass().getName());
> config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> maxMessageBytes);
> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxMessageBytes);
> config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class.getName());
> config.put(StreamsConfig.STATE_DIR_CONFIG, tmpDir);
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
> config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
> config.put(ProducerConfig.RETRIES_CONFIG, 2);
> 
> if (ssl)
> config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
> 
> Variables:
> appId - "my-streamer-app"
> topic - "20170502_instancea_1234"
> brokers - "localhost:9092,localhost:9093,localhost:9094"
> zookeepers - "localhost:2181,localhost:2182,localhost:2183"
> maxMessageBytes - 30000000
> ssl - true
> 
> Thanks,
> Henry
> -- 
> Henry Thacker
> 
> On 2 May 2017 at 10:16:25, Eno Thereska (eno.thereska@gmail.com) wrote:
> 
>> Hi Henry,
>> 
>> Could you share the streams configuration for your apps? I.e., the part
>> where you assign application id and all the rest of the configs (just
>> configs, not code).
>> 
>> Thanks
>> Eno
>> 
>> On May 2, 2017, at 8:53 AM, Henry Thacker <he...@henrythacker.com> wrote:
>> 
>> Thanks all for your replies - I have checked out the docs which were very
>> helpful.
>> 
>> I have now moved the separate topic streams to different processes each
>> with their own app.id and I'm getting the following pattern, with no data
>> consumed:
>> 
>> "Starting stream thread [StreamThread-1]
>> Discovered coordinator .... for group ..
>> Marking the coordinator .... dead for group ..
>> Discovered coordinator .... for group ..
>> Marking the coordinator .... dead for group .."
>> 
>> The discover and dead states repeat every few minutes.
>> 
>> During this time, the broker logs look happy.
>> 
>> One other, hopefully unrelated point, is this cluster is all SSL
>> encrypted.
>> 
>> Thanks,
>> Henry
>> 
>> --
>> Henry Thacker
>> 
>> On 29 April 2017 at 05:31:30, Matthias J. Sax (matthias@confluent.io)
>> wrote:
>> 
>> Henry,
>> 
>> you might want to check out the docs, that give an overview of the
>> architecture:
>> http://docs.confluent.io/current/streams/architecture.html#example
>> 
>> Also, I am wondering why your application did not crash: I would expect
>> an exception like
>> 
>> java.lang.IllegalArgumentException: Assigned partition foo-2 for
>> non-subscribed topic regex pattern; subscription pattern is bar
>> 
>> Maybe you just don't hit it, because both topics have a single partition
>> and not multiple.
>> 
>> Out of interest though, had I subscribed for both topics in one subscriber
>> - I would have expected records for both topics interleaved
>> 
>> 
>> Yes. That should happen.
>> 
>> why when
>> 
>> running this in two separate processes do I not observe the same?
>> 
>> 
>> Not sure what you mean by this?
>> 
>> If I fix this by changing the application ID for each streaming process -
>> does this mean I lose the ability to share state stores between the
>> applications?
>> 
>> 
>> Yes.
>> 
>> 
>> If both your topics are single partitioned, and you want to share state,
>> you will not be able to run with more then one thread in your Streams app.
>> 
>> The only way to work around this, would be to copy the data into another
>> topic with more partitions before you process them -- of course, this
>> would mean data duplication.
>> 
>> 
>> -Matthias
>> 
>> 
>> On 4/28/17 12:45 PM, Henry Thacker wrote:
>> 
>> Thanks Michael and Eno for your help - I always thought the unit of
>> parallelism was a combination of topic & partition rather than just
>> partition.
>> 
>> Out of interest though, had I subscribed for both topics in one subscriber
>> - I would have expected records for both topics interleaved, why when
>> running this in two separate processes do I not observe the same? Just
>> wanting to try and form a mental model of how this is all working - I will
>> try and look through some code over the weekend.
>> 
>> If I fix this by changing the application ID for each streaming process -
>> does this mean I lose the ability to share state stores between the
>> applications?
>> 
>> Unfortunately the data on the input topics are provided by a third party
>> component which sends these keyless messages on a single partition per
>> topic, so I have little ability to fix this at source :-(
>> 
>> Thanks,
>> Henry
>> 
>> 
>> ------------------------------
>> 
>> 
>> 


Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Henry Thacker <he...@henrythacker.com>.
Hi Eno,

At the moment this is hard coded, but overridable with command line
parameters:

config.put(StreamsConfig.APPLICATION_ID, appId + "-" + topic);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Bytes().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.Bytes().getClass().getName());
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
maxMessageBytes);
config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxMessageBytes);
config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
config.put(StreamsConfig.STATE_DIR_CONFIG, tmpDir);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
config.put(ProducerConfig.RETRIES_CONFIG, 2);

if (ssl)
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

Variables:
appId - "my-streamer-app"
topic - "20170502_instancea_1234"
brokers - "localhost:9092,localhost:9093,localhost:9094"
zookeepers - "localhost:2181,localhost:2182,localhost:2183"
maxMessageBytes - 30000000
ssl - true

Thanks,
Henry
-- 
Henry Thacker

On 2 May 2017 at 10:16:25, Eno Thereska (eno.thereska@gmail.com) wrote:

> Hi Henry,
>
> Could you share the streams configuration for your apps? I.e., the part
> where you assign application id and all the rest of the configs (just
> configs, not code).
>
> Thanks
> Eno
>
> On May 2, 2017, at 8:53 AM, Henry Thacker <he...@henrythacker.com> wrote:
>
> Thanks all for your replies - I have checked out the docs which were very
> helpful.
>
> I have now moved the separate topic streams to different processes each
> with their own app.id and I'm getting the following pattern, with no data
> consumed:
>
> "Starting stream thread [StreamThread-1]
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group ..
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group .."
>
> The discover and dead states repeat every few minutes.
>
> During this time, the broker logs look happy.
>
> One other, hopefully unrelated point, is this cluster is all SSL
> encrypted.
>
> Thanks,
> Henry
>
> --
> Henry Thacker
>
> On 29 April 2017 at 05:31:30, Matthias J. Sax (matthias@confluent.io)
> wrote:
>
> Henry,
>
> you might want to check out the docs, that give an overview of the
> architecture:
> http://docs.confluent.io/current/streams/architecture.html#example
>
> Also, I am wondering why your application did not crash: I would expect
> an exception like
>
> java.lang.IllegalArgumentException: Assigned partition foo-2 for
> non-subscribed topic regex pattern; subscription pattern is bar
>
> Maybe you just don't hit it, because both topics have a single partition
> and not multiple.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved
>
>
> Yes. That should happen.
>
> why when
>
> running this in two separate processes do I not observe the same?
>
>
> Not sure what you mean by this?
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
>
> Yes.
>
>
> If both your topics are single partitioned, and you want to share state,
> you will not be able to run with more then one thread in your Streams app.
>
> The only way to work around this, would be to copy the data into another
> topic with more partitions before you process them -- of course, this
> would mean data duplication.
>
>
> -Matthias
>
>
> On 4/28/17 12:45 PM, Henry Thacker wrote:
>
> Thanks Michael and Eno for your help - I always thought the unit of
> parallelism was a combination of topic & partition rather than just
> partition.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved, why when
> running this in two separate processes do I not observe the same? Just
> wanting to try and form a mental model of how this is all working - I will
> try and look through some code over the weekend.
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
> Unfortunately the data on the input topics are provided by a third party
> component which sends these keyless messages on a single partition per
> topic, so I have little ability to fix this at source :-(
>
> Thanks,
> Henry
>
>
> ------------------------------
>
>
>

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Eno Thereska <en...@gmail.com>.
Hi Henry,

Could you share the streams configuration for your apps? I.e., the part where you assign application id and all the rest of the configs (just configs, not code).

Thanks
Eno
> On May 2, 2017, at 8:53 AM, Henry Thacker <he...@henrythacker.com> wrote:
> 
> Thanks all for your replies - I have checked out the docs which were very
> helpful.
> 
> I have now moved the separate topic streams to different processes each
> with their own app.id and I'm getting the following pattern, with no data
> consumed:
> 
> "Starting stream thread [StreamThread-1]
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group ..
> Discovered coordinator .... for group ..
> Marking the coordinator .... dead for group .."
> 
> The discover and dead states repeat every few minutes.
> 
> During this time, the broker logs look happy.
> 
> One other, hopefully unrelated point, is this cluster is all SSL encrypted.
> 
> Thanks,
> Henry
> 
> -- 
> Henry Thacker
> 
> On 29 April 2017 at 05:31:30, Matthias J. Sax (matthias@confluent.io) wrote:
> 
>> Henry,
>> 
>> you might want to check out the docs, that give an overview of the
>> architecture:
>> http://docs.confluent.io/current/streams/architecture.html#example
>> 
>> Also, I am wondering why your application did not crash: I would expect
>> an exception like
>> 
>> java.lang.IllegalArgumentException: Assigned partition foo-2 for
>> non-subscribed topic regex pattern; subscription pattern is bar
>> 
>> Maybe you just don't hit it, because both topics have a single partition
>> and not multiple.
>> 
>> Out of interest though, had I subscribed for both topics in one subscriber
>> - I would have expected records for both topics interleaved
>> 
>> 
>> Yes. That should happen.
>> 
>> why when
>> 
>> running this in two separate processes do I not observe the same?
>> 
>> 
>> Not sure what you mean by this?
>> 
>> If I fix this by changing the application ID for each streaming process -
>> does this mean I lose the ability to share state stores between the
>> applications?
>> 
>> 
>> Yes.
>> 
>> 
>> If both your topics are single partitioned, and you want to share state,
>> you will not be able to run with more then one thread in your Streams app.
>> 
>> The only way to work around this, would be to copy the data into another
>> topic with more partitions before you process them -- of course, this
>> would mean data duplication.
>> 
>> 
>> -Matthias
>> 
>> 
>> On 4/28/17 12:45 PM, Henry Thacker wrote:
>> 
>> Thanks Michael and Eno for your help - I always thought the unit of
>> parallelism was a combination of topic & partition rather than just
>> partition.
>> 
>> Out of interest though, had I subscribed for both topics in one subscriber
>> - I would have expected records for both topics interleaved, why when
>> running this in two separate processes do I not observe the same? Just
>> wanting to try and form a mental model of how this is all working - I will
>> try and look through some code over the weekend.
>> 
>> If I fix this by changing the application ID for each streaming process -
>> does this mean I lose the ability to share state stores between the
>> applications?
>> 
>> Unfortunately the data on the input topics are provided by a third party
>> component which sends these keyless messages on a single partition per
>> topic, so I have little ability to fix this at source :-(
>> 
>> Thanks,
>> Henry
>> 
>> 
>> ------------------------------
>> 


Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Henry Thacker <he...@henrythacker.com>.
Thanks all for your replies - I have checked out the docs which were very
helpful.

I have now moved the separate topic streams to different processes each
with their own app.id and I'm getting the following pattern, with no data
consumed:

"Starting stream thread [StreamThread-1]
Discovered coordinator .... for group ..
Marking the coordinator .... dead for group ..
Discovered coordinator .... for group ..
Marking the coordinator .... dead for group .."

The discover and dead states repeat every few minutes.

During this time, the broker logs look happy.

One other, hopefully unrelated point, is this cluster is all SSL encrypted.

Thanks,
Henry

-- 
Henry Thacker

On 29 April 2017 at 05:31:30, Matthias J. Sax (matthias@confluent.io) wrote:

> Henry,
>
> you might want to check out the docs, that give an overview of the
> architecture:
> http://docs.confluent.io/current/streams/architecture.html#example
>
> Also, I am wondering why your application did not crash: I would expect
> an exception like
>
> java.lang.IllegalArgumentException: Assigned partition foo-2 for
> non-subscribed topic regex pattern; subscription pattern is bar
>
> Maybe you just don't hit it, because both topics have a single partition
> and not multiple.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved
>
>
> Yes. That should happen.
>
> why when
>
> running this in two separate processes do I not observe the same?
>
>
> Not sure what you mean by this?
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
>
> Yes.
>
>
> If both your topics are single partitioned, and you want to share state,
> you will not be able to run with more then one thread in your Streams app.
>
> The only way to work around this, would be to copy the data into another
> topic with more partitions before you process them -- of course, this
> would mean data duplication.
>
>
> -Matthias
>
>
> On 4/28/17 12:45 PM, Henry Thacker wrote:
>
> Thanks Michael and Eno for your help - I always thought the unit of
> parallelism was a combination of topic & partition rather than just
> partition.
>
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved, why when
> running this in two separate processes do I not observe the same? Just
> wanting to try and form a mental model of how this is all working - I will
> try and look through some code over the weekend.
>
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
>
> Unfortunately the data on the input topics are provided by a third party
> component which sends these keyless messages on a single partition per
> topic, so I have little ability to fix this at source :-(
>
> Thanks,
> Henry
>
>
> ------------------------------
>

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Henry,

you might want to check out the docs, that give an overview of the
architecture:
http://docs.confluent.io/current/streams/architecture.html#example

Also, I am wondering why your application did not crash: I would expect
an exception like

java.lang.IllegalArgumentException: Assigned partition foo-2 for
non-subscribed topic regex pattern; subscription pattern is bar

Maybe you just don't hit it, because both topics have a single partition
and not multiple.

>> Out of interest though, had I subscribed for both topics in one subscriber
>> - I would have expected records for both topics interleaved

Yes. That should happen.

> why when
>> running this in two separate processes do I not observe the same?

Not sure what you mean by this?

>> If I fix this by changing the application ID for each streaming process -
>> does this mean I lose the ability to share state stores between the
>> applications?

Yes.


If both your topics are single partitioned, and you want to share state,
you will not be able to run with more then one thread in your Streams app.

The only way to work around this, would be to copy the data into another
topic with more partitions before you process them -- of course, this
would mean data duplication.


-Matthias


On 4/28/17 12:45 PM, Henry Thacker wrote:
> Thanks Michael and Eno for your help - I always thought the unit of
> parallelism was a combination of topic & partition rather than just
> partition.
> 
> Out of interest though, had I subscribed for both topics in one subscriber
> - I would have expected records for both topics interleaved, why when
> running this in two separate processes do I not observe the same? Just
> wanting to try and form a mental model of how this is all working - I will
> try and look through some code over the weekend.
> 
> If I fix this by changing the application ID for each streaming process -
> does this mean I lose the ability to share state stores between the
> applications?
> 
> Unfortunately the data on the input topics are provided by a third party
> component which sends these keyless messages on a single partition per
> topic, so I have little ability to fix this at source :-(
> 
> Thanks,
> Henry
> 


Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Henry Thacker <he...@henrythacker.com>.
Thanks Michael and Eno for your help - I always thought the unit of
parallelism was a combination of topic & partition rather than just
partition.

Out of interest though, had I subscribed for both topics in one subscriber
- I would have expected records for both topics interleaved, why when
running this in two separate processes do I not observe the same? Just
wanting to try and form a mental model of how this is all working - I will
try and look through some code over the weekend.

If I fix this by changing the application ID for each streaming process -
does this mean I lose the ability to share state stores between the
applications?

Unfortunately the data on the input topics are provided by a third party
component which sends these keyless messages on a single partition per
topic, so I have little ability to fix this at source :-(

Thanks,
Henry

-- 
Henry Thacker

On 28 April 2017 at 17:32:28, Michael Noll (michael@confluent.io) wrote:

> To add to what Eno said:
>
> You can of course use the Kafka Streams API to build an application that
> consumes from multiple Kafka topics. But, going back to your original
> question, the scalability of Kafka and the Kafka Streams API is based on
> partitions, not on topics.
>
> -Michael
>
>
>
>
> On Fri, Apr 28, 2017 at 6:28 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
> Hi Henry,
>
> Kafka Streams scales differently and does not support having the same
> application ID subscribe to different topics for scale-out. The way we
> support scaling out if you want to use the same application id is through
> partitions, i.e., Kafka Streams automatically assigns partitions to your
> multiple instances. If you want to scale out using topics you'll need to
> use different application IDs.
>
> So in a nutshell this pattern is not supported. Was there a reason you
> needed to do it like that?
>
> Thanks
> Eno
>
> On 28 Apr 2017, at 11:41, Henry Thacker <he...@henrythacker.com> wrote:
>
> Should also add - there are definitely live incoming messages on both
>
> input
>
> topics when my streams are running. The auto offset reset config is set
>
> to
>
> "earliest" and because the input data streams are quite large (several
> millions records each), I set a relatively small max poll records (200)
>
> so
>
> we don't run into heartbeating issues if we restart intraday.
>
> Thanks,
> Henry
>
> --
> Henry Thacker
>
> On 28 April 2017 at 11:37:53, Henry Thacker (henry@henrythacker.com)
>
> wrote:
>
>
> Hi Eno,
>
> Thanks for your reply - the code that builds the topology is something
> like this (I don't have email and the code access on the same machine
> unfortunately - so might not be 100% accurate / terribly formatted!).
>
> The stream application is a simple verifier which stores a tiny bit of
> state in a state store. The processor is custom and only has logic in
> init() to store the context and retrieve the store and process(...) to
> validate the incoming messages and forward these on when appropriate.
>
> There is no joining, aggregates or windowing.
>
> In public static void main:
>
> String topic = args[0];
> String output = args[1];
>
> KStreamBuilder builder = new KStreamBuilder();
>
> StateStoreSupplier stateStore =
> Stores.create("mystore").withStringKeys().withByteArrayValues().
>
> persistent().build();
>
>
> KStream<Bytes, Bytes> stream = builder.stream(topic);
>
> builder.addStateStore(stateStore);
>
> stream.process(this::buildStreamProcessor, "mystore");
>
> stream.to(outputTopic);
>
> KafkaStreams streams = new KafkaStreams(builder, getProps());
> streams.setUncaughtExceptionHandler(...);
> streams.start();
>
> Thanks,
> Henry
>
>
> On 28 April 2017 at 11:26:07, Eno Thereska (eno.thereska@gmail.com)
>
> wrote:
>
>
> Hi Henry,
>
> Could you share the code that builds your topology so we see how the
> topics are passed in? Also, this would depend on what the streaming
>
> logic
>
> is doing with the topics, e.g., if you're joining them then both
>
> partitions
>
> need to be consumed by the same instance.
>
> Eno
>
> On 28 Apr 2017, at 11:01, Henry Thacker <he...@henrythacker.com>
>
> wrote:
>
>
> Hi,
>
> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
> processes, Consumer 1 and 2. They both share the same application ID,
>
> but
>
> subscribe for different single-partition topics. Only one stream
>
> consumer
>
> receives messages.
>
> The non working stream consumer just sits there logging:
>
> Starting stream thread [StreamThread-1]
> Discovered coordinator <Host> (Id: ...) for group my-streamer
> Revoking previously assigned partitions [] for group my-streamer
> (Re-)joining group my-streamer
> Successfully joined group my-streamer with generation 3
> Setting newly assigned partitions [] for group my-streamer
> (Re-)joining group my-streamer
> Successfully joined group my-streamer with generation 4
>
> If I was trying to subscribe to the same topic & partition I could
> understand this behaviour, but given that the subscriptions are for
> different input topics, I would have thought this should work?
>
> Thanks,
> Henry
>
> --
> Henry Thacker
>
>
>
>
>
>

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Michael Noll <mi...@confluent.io>.
To add to what Eno said:

You can of course use the Kafka Streams API to build an application that
consumes from multiple Kafka topics.  But, going back to your original
question, the scalability of Kafka and the Kafka Streams API is based on
partitions, not on topics.

-Michael




On Fri, Apr 28, 2017 at 6:28 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Henry,
>
> Kafka Streams scales differently and does not support having the same
> application ID subscribe to different topics for scale-out. The way we
> support scaling out if you want to use the same application id is through
> partitions, i.e., Kafka Streams automatically assigns partitions to your
> multiple instances. If you want to scale out using topics you'll need to
> use different application IDs.
>
> So in a nutshell this pattern is not supported. Was there a reason you
> needed to do it like that?
>
> Thanks
> Eno
>
> > On 28 Apr 2017, at 11:41, Henry Thacker <he...@henrythacker.com> wrote:
> >
> > Should also add - there are definitely live incoming messages on both
> input
> > topics when my streams are running. The auto offset reset config is set
> to
> > "earliest" and because the input data streams are quite large (several
> > millions records each), I set a relatively small max poll records (200)
> so
> > we don't run into heartbeating issues if we restart intraday.
> >
> > Thanks,
> > Henry
> >
> > --
> > Henry Thacker
> >
> > On 28 April 2017 at 11:37:53, Henry Thacker (henry@henrythacker.com)
> wrote:
> >
> >> Hi Eno,
> >>
> >> Thanks for your reply - the code that builds the topology is something
> >> like this (I don't have email and the code access on the same machine
> >> unfortunately - so might not be 100% accurate / terribly formatted!).
> >>
> >> The stream application is a simple verifier which stores a tiny bit of
> >> state in a state store. The processor is custom and only has logic in
> >> init() to store the context and retrieve the store and process(...) to
> >> validate the incoming messages and forward these on when appropriate.
> >>
> >> There is no joining, aggregates or windowing.
> >>
> >> In public static void main:
> >>
> >> String topic = args[0];
> >> String output = args[1];
> >>
> >> KStreamBuilder builder = new KStreamBuilder();
> >>
> >> StateStoreSupplier stateStore =
> >> Stores.create("mystore").withStringKeys().withByteArrayValues().
> persistent().build();
> >>
> >> KStream<Bytes, Bytes> stream = builder.stream(topic);
> >>
> >> builder.addStateStore(stateStore);
> >>
> >> stream.process(this::buildStreamProcessor, "mystore");
> >>
> >> stream.to(outputTopic);
> >>
> >> KafkaStreams streams = new KafkaStreams(builder, getProps());
> >> streams.setUncaughtExceptionHandler(...);
> >> streams.start();
> >>
> >> Thanks,
> >> Henry
> >>
> >>
> >> On 28 April 2017 at 11:26:07, Eno Thereska (eno.thereska@gmail.com)
> wrote:
> >>
> >>> Hi Henry,
> >>>
> >>> Could you share the code that builds your topology so we see how the
> >>> topics are passed in? Also, this would depend on what the streaming
> logic
> >>> is doing with the topics, e.g., if you're joining them then both
> partitions
> >>> need to be consumed by the same instance.
> >>>
> >>> Eno
> >>>
> >>> On 28 Apr 2017, at 11:01, Henry Thacker <he...@henrythacker.com>
> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
> >>> processes, Consumer 1 and 2. They both share the same application ID,
> but
> >>> subscribe for different single-partition topics. Only one stream
> consumer
> >>> receives messages.
> >>>
> >>> The non working stream consumer just sits there logging:
> >>>
> >>> Starting stream thread [StreamThread-1]
> >>> Discovered coordinator <Host> (Id: ...) for group my-streamer
> >>> Revoking previously assigned partitions [] for group my-streamer
> >>> (Re-)joining group my-streamer
> >>> Successfully joined group my-streamer with generation 3
> >>> Setting newly assigned partitions [] for group my-streamer
> >>> (Re-)joining group my-streamer
> >>> Successfully joined group my-streamer with generation 4
> >>>
> >>> If I was trying to subscribe to the same topic & partition I could
> >>> understand this behaviour, but given that the subscriptions are for
> >>> different input topics, I would have thought this should work?
> >>>
> >>> Thanks,
> >>> Henry
> >>>
> >>> --
> >>> Henry Thacker
> >>>
> >>>
> >>>
>
>

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Eno Thereska <en...@gmail.com>.
Hi Henry,

Kafka Streams scales differently and does not support having the same application ID subscribe to different topics for scale-out. The way we support scaling out if you want to use the same application id is through partitions, i.e., Kafka Streams automatically assigns partitions to your multiple instances. If you want to scale out using topics you'll need to use different application IDs.

So in a nutshell this pattern is not supported. Was there a reason you needed to do it like that? 

Thanks
Eno

> On 28 Apr 2017, at 11:41, Henry Thacker <he...@henrythacker.com> wrote:
> 
> Should also add - there are definitely live incoming messages on both input
> topics when my streams are running. The auto offset reset config is set to
> "earliest" and because the input data streams are quite large (several
> millions records each), I set a relatively small max poll records (200) so
> we don't run into heartbeating issues if we restart intraday.
> 
> Thanks,
> Henry
> 
> -- 
> Henry Thacker
> 
> On 28 April 2017 at 11:37:53, Henry Thacker (henry@henrythacker.com) wrote:
> 
>> Hi Eno,
>> 
>> Thanks for your reply - the code that builds the topology is something
>> like this (I don't have email and the code access on the same machine
>> unfortunately - so might not be 100% accurate / terribly formatted!).
>> 
>> The stream application is a simple verifier which stores a tiny bit of
>> state in a state store. The processor is custom and only has logic in
>> init() to store the context and retrieve the store and process(...) to
>> validate the incoming messages and forward these on when appropriate.
>> 
>> There is no joining, aggregates or windowing.
>> 
>> In public static void main:
>> 
>> String topic = args[0];
>> String output = args[1];
>> 
>> KStreamBuilder builder = new KStreamBuilder();
>> 
>> StateStoreSupplier stateStore =
>> Stores.create("mystore").withStringKeys().withByteArrayValues().persistent().build();
>> 
>> KStream<Bytes, Bytes> stream = builder.stream(topic);
>> 
>> builder.addStateStore(stateStore);
>> 
>> stream.process(this::buildStreamProcessor, "mystore");
>> 
>> stream.to(outputTopic);
>> 
>> KafkaStreams streams = new KafkaStreams(builder, getProps());
>> streams.setUncaughtExceptionHandler(...);
>> streams.start();
>> 
>> Thanks,
>> Henry
>> 
>> 
>> On 28 April 2017 at 11:26:07, Eno Thereska (eno.thereska@gmail.com) wrote:
>> 
>>> Hi Henry,
>>> 
>>> Could you share the code that builds your topology so we see how the
>>> topics are passed in? Also, this would depend on what the streaming logic
>>> is doing with the topics, e.g., if you're joining them then both partitions
>>> need to be consumed by the same instance.
>>> 
>>> Eno
>>> 
>>> On 28 Apr 2017, at 11:01, Henry Thacker <he...@henrythacker.com> wrote:
>>> 
>>> Hi,
>>> 
>>> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
>>> processes, Consumer 1 and 2. They both share the same application ID, but
>>> subscribe for different single-partition topics. Only one stream consumer
>>> receives messages.
>>> 
>>> The non working stream consumer just sits there logging:
>>> 
>>> Starting stream thread [StreamThread-1]
>>> Discovered coordinator <Host> (Id: ...) for group my-streamer
>>> Revoking previously assigned partitions [] for group my-streamer
>>> (Re-)joining group my-streamer
>>> Successfully joined group my-streamer with generation 3
>>> Setting newly assigned partitions [] for group my-streamer
>>> (Re-)joining group my-streamer
>>> Successfully joined group my-streamer with generation 4
>>> 
>>> If I was trying to subscribe to the same topic & partition I could
>>> understand this behaviour, but given that the subscriptions are for
>>> different input topics, I would have thought this should work?
>>> 
>>> Thanks,
>>> Henry
>>> 
>>> --
>>> Henry Thacker
>>> 
>>> 
>>> 


Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Henry Thacker <he...@henrythacker.com>.
Should also add - there are definitely live incoming messages on both input
topics when my streams are running. The auto offset reset config is set to
"earliest" and because the input data streams are quite large (several
millions records each), I set a relatively small max poll records (200) so
we don't run into heartbeating issues if we restart intraday.

Thanks,
Henry

-- 
Henry Thacker

On 28 April 2017 at 11:37:53, Henry Thacker (henry@henrythacker.com) wrote:

> Hi Eno,
>
> Thanks for your reply - the code that builds the topology is something
> like this (I don't have email and the code access on the same machine
> unfortunately - so might not be 100% accurate / terribly formatted!).
>
> The stream application is a simple verifier which stores a tiny bit of
> state in a state store. The processor is custom and only has logic in
> init() to store the context and retrieve the store and process(...) to
> validate the incoming messages and forward these on when appropriate.
>
> There is no joining, aggregates or windowing.
>
> In public static void main:
>
> String topic = args[0];
> String output = args[1];
>
> KStreamBuilder builder = new KStreamBuilder();
>
> StateStoreSupplier stateStore =
> Stores.create("mystore").withStringKeys().withByteArrayValues().persistent().build();
>
> KStream<Bytes, Bytes> stream = builder.stream(topic);
>
> builder.addStateStore(stateStore);
>
> stream.process(this::buildStreamProcessor, "mystore");
>
> stream.to(outputTopic);
>
> KafkaStreams streams = new KafkaStreams(builder, getProps());
> streams.setUncaughtExceptionHandler(...);
> streams.start();
>
> Thanks,
> Henry
>
>
> On 28 April 2017 at 11:26:07, Eno Thereska (eno.thereska@gmail.com) wrote:
>
>> Hi Henry,
>>
>> Could you share the code that builds your topology so we see how the
>> topics are passed in? Also, this would depend on what the streaming logic
>> is doing with the topics, e.g., if you're joining them then both partitions
>> need to be consumed by the same instance.
>>
>> Eno
>>
>> On 28 Apr 2017, at 11:01, Henry Thacker <he...@henrythacker.com> wrote:
>>
>> Hi,
>>
>> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
>> processes, Consumer 1 and 2. They both share the same application ID, but
>> subscribe for different single-partition topics. Only one stream consumer
>> receives messages.
>>
>> The non working stream consumer just sits there logging:
>>
>> Starting stream thread [StreamThread-1]
>> Discovered coordinator <Host> (Id: ...) for group my-streamer
>> Revoking previously assigned partitions [] for group my-streamer
>> (Re-)joining group my-streamer
>> Successfully joined group my-streamer with generation 3
>> Setting newly assigned partitions [] for group my-streamer
>> (Re-)joining group my-streamer
>> Successfully joined group my-streamer with generation 4
>>
>> If I was trying to subscribe to the same topic & partition I could
>> understand this behaviour, but given that the subscriptions are for
>> different input topics, I would have thought this should work?
>>
>> Thanks,
>> Henry
>>
>> --
>> Henry Thacker
>>
>>
>>

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Henry Thacker <he...@henrythacker.com>.
Hi Eno,

Thanks for your reply - the code that builds the topology is something like
this (I don't have email and the code access on the same machine
unfortunately - so might not be 100% accurate / terribly formatted!).

The stream application is a simple verifier which stores a tiny bit of
state in a state store. The processor is custom and only has logic in
init() to store the context and retrieve the store and process(...) to
validate the incoming messages and forward these on when appropriate.

There is no joining, aggregates or windowing.

In public static void main:

String topic = args[0];
String output = args[1];

KStreamBuilder builder = new KStreamBuilder();

StateStoreSupplier stateStore =
Stores.create("mystore").withStringKeys().withByteArrayValues().persistent().build();

KStream<Bytes, Bytes> stream = builder.stream(topic);

builder.addStateStore(stateStore);

stream.process(this::buildStreamProcessor, "mystore");

stream.to(outputTopic);

KafkaStreams streams = new KafkaStreams(builder, getProps());
streams.setUncaughtExceptionHandler(...);
streams.start();

Thanks,
Henry


On 28 April 2017 at 11:26:07, Eno Thereska (eno.thereska@gmail.com) wrote:

> Hi Henry,
>
> Could you share the code that builds your topology so we see how the
> topics are passed in? Also, this would depend on what the streaming logic
> is doing with the topics, e.g., if you're joining them then both partitions
> need to be consumed by the same instance.
>
> Eno
>
> On 28 Apr 2017, at 11:01, Henry Thacker <he...@henrythacker.com> wrote:
>
> Hi,
>
> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
> processes, Consumer 1 and 2. They both share the same application ID, but
> subscribe for different single-partition topics. Only one stream consumer
> receives messages.
>
> The non working stream consumer just sits there logging:
>
> Starting stream thread [StreamThread-1]
> Discovered coordinator <Host> (Id: ...) for group my-streamer
> Revoking previously assigned partitions [] for group my-streamer
> (Re-)joining group my-streamer
> Successfully joined group my-streamer with generation 3
> Setting newly assigned partitions [] for group my-streamer
> (Re-)joining group my-streamer
> Successfully joined group my-streamer with generation 4
>
> If I was trying to subscribe to the same topic & partition I could
> understand this behaviour, but given that the subscriptions are for
> different input topics, I would have thought this should work?
>
> Thanks,
> Henry
>
> --
> Henry Thacker
>
>
>

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

Posted by Eno Thereska <en...@gmail.com>.
Hi Henry,

Could you share the code that builds your topology so we see how the topics are passed in? Also, this would depend on what the streaming logic is doing with the topics, e.g., if you're joining them then both partitions need to be consumed by the same instance.

Eno
> On 28 Apr 2017, at 11:01, Henry Thacker <he...@henrythacker.com> wrote:
> 
> Hi,
> 
> I'm using Kafka 0.10.0.1 and Kafka streams. When I have two different
> processes, Consumer 1 and 2. They both share the same application ID, but
> subscribe for different single-partition topics. Only one stream consumer
> receives messages.
> 
> The non working stream consumer just sits there logging:
> 
> Starting stream thread [StreamThread-1]
> Discovered coordinator <Host> (Id: ...) for group my-streamer
> Revoking previously assigned partitions [] for group my-streamer
> (Re-)joining group my-streamer
> Successfully joined group my-streamer with generation 3
> Setting newly assigned partitions [] for group my-streamer
> (Re-)joining group my-streamer
> Successfully joined group my-streamer with generation 4
> 
> If I was trying to subscribe to the same topic & partition I could
> understand this behaviour, but given that the subscriptions are for
> different input topics, I would have thought this should work?
> 
> Thanks,
> Henry
> 
> -- 
> Henry Thacker