You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Shahar Frank <sr...@gmail.com> on 2023/04/15 08:50:21 UTC

KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Hi All,

Posting here as suggested here
<https://github.com/apache/beam/issues/25978#issuecomment-1508530483>.

I'm using KafkaIO to consume events from a Kafka topic.
I've added "group.id" to the consumer properties.
When running the pipeline I can see this value sent to Kafka in the
consumer properties.
The consumers created by KafkaIO fail to join the consumer group.
Looking into the code I can see that nowhere is the consumer "subscribing"
to the topic which is how KafkaConsumer should join a consumer group. It
seems the code attempts to circumvent the partition assignment mechanism
provided by Kafka to use it's own.
By doing that it prevents the user from using consumer groups.
Is that by intention? Is there any reason why the decision to avoid using
consumer groups has been taken?
I would love to see any documentation about that if possible please.

Cheers,
Shahar.

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

my two cents on this. While it would perfectly possible to use consumer 
group in KafkaIO, it has its own issues. The most visible would be, that 
using subscriptions might introduce unnecessary duplicates in downstream 
processing. The reason for this is that consumer in a consumer group 
might be reassigned partitions and/or being reset to a different offset 
based on conditions that are out of control of the consumer itself. This 
might lead to an inability to successfully commit offset of a bundle 
after it has been sent downstream, while the 
processed-but-not-yet-committed input element might be reprocessed by a 
different worker due to partition rebalance. This add unnecessary 
complexity with questionable benefits (observability of lag in a 
consumer group and possible automatic discovery of new partitions in a 
topic).

For these reasons I'd say, that it would be possible to introduce a 
different (e.g. KafkaConsumerGroupIO) IO, which could be added to Beam 
itself or (perhaps) some extension, but it makes little sense to 
introduce this into KafkaIO itself.

Hope this helps,

  Jan

On 10/18/23 05:49, Shaojie Wu wrote:
> Can't agree with Shahar Frank more
>
> On 2023/04/19 18:17:15 Shahar Frank wrote:
>> Hi Daniel,
>>
>> I think I've already answered these in a previous email but let me answer
>> them again.
>>
>> I was specifically responding to quoted points from your last email. I
>>> really don't understand why you, as a user, care if the implementation of
>>> the framework is using consumer groups or not as long as it has the
>>> throughput you need and is correct. If there is something specific this
>>> would be useful for, like monitoring or metrics, it seems a reasonable
>>> feature request to me to ask to reflect the progress state in a kafka
>>> consumer group, but not to use the underlying assignment mechanism for the
>>> reasons stated above.
>>>
>> I do care for a couple of reasons:
>> 1) Introducing risk with a technology that non one knows in the company vs.
>> a technology people know and trust (i.e. Kafka Consumer Groups)
>> 2) A multitude of alerting, monitoring and other observability tools that
>> are using consumer groups will not be usable and new solutions would be
>> required
>> 3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in
>> the company will become useless - and this in turn introduces risk to
>> projects
>>
>> If you want to run in a single machine application mode, you can try
>>> setting the `flinkMaster` parameter to `[local]`, which should launch an
>>> inline flink runner just for your pipeline. If you want to have a scaling
>>> out cluster per-application, you can launch a repeatable flink cluster with
>>> kubernetes on a per-application basis pretty easily.
>>
>> I do agree that a Flink cluster is a great solution and have maintained a
>> few in my time.
>> Sadly in our use case I have to consider constraints set by security and
>> platform teams and that will take time.
>> By the time these follow through it is very likely that the decision to use
>> Beam would have fallen in favor of other solutions (i.e. Kafka Streams,
>> Camel and others) and this would be a shame in my view. It is very unlikely
>> that once taken this decision would be reversed for a long time.
>>
>> Given that a Flink cluster is not an option for me at this point I have
>> been trying to push a solution where instances of a Beam pipeline are run
>> "disconnected" using a DIrectRunner or (better even) a FlinkRunner in local
>> standalone mode (as you're suggesting) - and like you suggested we are
>> running those using a K8s deployment which allows us to scale up a required.
>> The issue is if more than one pod attempts to run the pipeline - they will
>> not split the partitions between them but rather each would consume ALL
>> partitions and the output would include as many duplications as the number
>> of pods. This solution will then not be able to scale up horizontally.
>>
>> That is exactly why I'm trying to suggest using consumer groups.
>> In this attempt I created - here
>> <https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java>
>> -  I've already shown it is possible (albeit I admit with limitations such
>> as you described) to use consumer groups and effectively allow our use case
>> to run on a scaled up K8s deployment of DirectRunners.
>>
>> And again finally my question is why should Kafka be treated differently
>> from other messaging systems like SQS and PubSub for which it seems Beam
>> does not attempt to manage the distribution strategy as well the mechanism
>> for managing processed (committed) messages?
>>
>> If Beam is able to perform as well with them managing these couldn't the
>> same be applied to Kafka?
>>
>> Cheers,
>> Shahar.
>>
>> ------------------------------
>>
>> Shahar Frank
>>
>> srfrnk@gmail.com
>>
>> +447799561438
>>
>> ------------------------------
>>
>>
>>
>>
>>
>> On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dp...@google.com> wrote:
>>
>>> Hello,
>>>
>>> I was specifically responding to quoted points from your last email. I
>>> really don't understand why you, as a user, care if the implementation of
>>> the framework is using consumer groups or not as long as it has the
>>> throughput you need and is correct. If there is something specific this
>>> would be useful for, like monitoring or metrics, it seems a reasonable
>>> feature request to me to ask to reflect the progress state in a kafka
>>> consumer group, but not to use the underlying assignment mechanism for the
>>> reasons stated above.
>>>
>>> Per: "why Beam should recommend using a distributed processing framework"
>>>
>>> If you want to run in a single machine application mode, you can try
>>> setting the `flinkMaster` parameter to `[local]`, which should launch an
>>> inline flink runner just for your pipeline. If you want to have a scaling
>>> out cluster per-application, you can launch a repeatable flink cluster with
>>> kubernetes on a per-application basis pretty easily.
>>>
>>> -Daniel
>>>
>>> On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <sr...@gmail.com> wrote:
>>>
>>>> Hi Daniel,
>>>>
>>>> I think you missed my last email that deals exactly with what you just
>>>> commented.
>>>>
>>>> I can send it again if you can't find it
>>>>
>>>> Shahar.
>>>>
>>>>
>>>> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dp...@google.com> wrote:
>>>>
>>>>>> The real question I feel is why should there not be an option with
>>>>> Beam to use a recommended best practice (by another apache project in fact)
>>>>> when connecting to an external system?
>>>>>
>>>>> You ignored my previous answer. This is not a "best practice" for
>>>>> streaming frameworks, only for applications, and is not used by other
>>>>> frameworks *including kafka streams*.
>>>>>
>>>>>> The same as when connecting to SQS and PubSub should also be
>>>>> implemented with Kafka I think.
>>>>>
>>>>> This makes it much harder to implement a consistent watermark and
>>>>> involves either expensive secondary processing (Pub/Sub has a whole second
>>>>> tracking subscription) or incorrectness of the watermark bounds when
>>>>> reading from backlog. This makes windowing more likely to be incorrect.
>>>>>
>>>>>> the user should be allowed the option of using a mechanism that is
>>>>> part of the system being connected if willing to accept the implication it
>>>>> has.
>>>>>
>>>>> This requires a complete rewrite of how KafkaIO operates, so its not as
>>>>> easy as "flip a switch".
>>>>>
>>>>>>   And then the real question behind that would be - is there anything
>>>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>>>>
>>>>> You haven't answered my original question: Why do you care if it uses
>>>>> consumer groups or not?
>>>>>
>>>>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <sr...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Kenn,
>>>>>>
>>>>>> I agree with your comments fully.
>>>>>> I would rather use a Flink cluster even with the simple use case we
>>>>>> have now.
>>>>>>
>>>>>> Sadly sometimes we have other constraints - especially in larger
>>>>>> corporations like the one I work for - which make it harder to create and
>>>>>> maintain these without real reasons.
>>>>>> My intention is to introduce Apache Beam right now to allow for future
>>>>>> growth opportunities such as you described but to do that I have to
>>>>>> convince others that it is the best thing to do right now.
>>>>>>
>>>>>> It so happens that for the use case we are facing now perhaps Apache
>>>>>> Beam is too much and it looks like Kafka Streams would allow us to avoid
>>>>>> having to maintain another infrastructure cluster such as Flink.
>>>>>>
>>>>>> I would prefer to be able to propose a way we can use Beam right now
>>>>>> without the added overhead of a Flink/Spark cluster so that I can convince
>>>>>> the teams that it is a viable option right now.
>>>>>> The alternative of switching to Beam once more requirements arrive
>>>>>> would be much less preferable as this is likely to never gather enough
>>>>>> momentum for a full refactoring.
>>>>>>
>>>>>> Finally I feel the question that really needs to be asked is not why
>>>>>> Beam should recommend using a distributed processing framework which
>>>>>> totally makes sense and not even why it may require that in some use cases.
>>>>>>
>>>>>> The real question I feel is why should there not be an option with Beam
>>>>>> to use a recommended best practice (by another apache project in fact) when
>>>>>> connecting to an external system?
>>>>>> The same as when connecting to SQS and PubSub should also be
>>>>>> implemented with Kafka I think.
>>>>>> If we think the existing connector has advantages in some use cases
>>>>>> then by all means it should still exist as an option however I feel the
>>>>>> user should be allowed the option of using a mechanism that is part of the
>>>>>> system being connected if willing to accept the implication it has.
>>>>>>
>>>>>> And then the real question behind that would be - is there anything
>>>>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>>>>>
>>>>>> Cheers,
>>>>>> Shahar.
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> Shahar Frank
>>>>>>
>>>>>> srfrnk@gmail.com
>>>>>>
>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>
>>>>>>> Interesting discussion! I don't have a lot of expertise in some of
>>>>>>> these details but I wanted to just add one little comment.
>>>>>>>
>>>>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Regarding horizontal scaling I may have not been clear about what I
>>>>>>>> mean.
>>>>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s
>>>>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups managing
>>>>>>>> the distribution of data all would work well.
>>>>>>>> Moreover I can create more pods than the number of partitions and
>>>>>>>> keep them idle so that if/when some crash others pick the slack quicker. And
>>>>>>>> this would all be managed by the Kafka consumer group coordinator.
>>>>>>>> If I do the same with an Apache Beam application - for example with a
>>>>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance
>>>>>>>> will consume the entire topic as it is unaware of the other instances. To
>>>>>>>> work I will be required to use something like a Flink runner with a full
>>>>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
>>>>>>>> where maintaining such an additional cluster is not required for the actual
>>>>>>>> functionality (e.g. just for filtering) and incurs costs not everyone is
>>>>>>>> willing to make.
>>>>>>>>
>>>>>>> Horizontal scaling is built into Beam. It all occurs within a single
>>>>>>> pipeline. You should not try to scale up by running multiple pipelines
>>>>>>> consuming from the same consumer group. Running your Beam pipeline on a
>>>>>>> Flink cluster (or any other distributed runner) is the intended way to
>>>>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
>>>>>>> most" sophisticated) sharding and work balancing model, for distributing
>>>>>>> shards of work across workers. So it is by design that Beam is aware of the
>>>>>>> sharding, but also does its own thing above and beyond. It is true that if
>>>>>>> you have a very simple use case then Beam could be more than you need. Beam
>>>>>>> is very general purpose. Of course, simple uses grow into more complex uses
>>>>>>> :-) and you might end up stuck and/or porting to a distributed processing
>>>>>>> engine after all. It all depends on what you are trying to do and what you
>>>>>>> might do in the future!
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was
>>>>>>>> not saying it would. It might be run on a K8s cluster which is also how I
>>>>>>>> might be running K8s itself and any Flink/Spark cluster I might need. While
>>>>>>>> Flink can run in a standalone as well as cluster mode... as I said earlier
>>>>>>>> in this use case when Flink is used in standalone mode the use case fails -
>>>>>>>> which is my issue, as the same will work with Kafka Streams.
>>>>>>>>
>>>>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer
>>>>>>>> to use Beam but to do that I need to show that we have no disadvantages for
>>>>>>>> the initial use case.
>>>>>>>> I'm also very aware of the other benefits of Beam - being runner
>>>>>>>> agnostic, language agnostic, source and sink agostic etc. which is why I
>>>>>>>> would very much like to use it right now.
>>>>>>>> Sadly right now if we are unable to scale horizontally without
>>>>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major
>>>>>>>> disadvantage which might drive us to use Kafka Streams instead.
>>>>>>>>
>>>>>>>> I have opened a bug in Github
>>>>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can
>>>>>>>> edit that to be more specific to documentation however I feel this would be
>>>>>>>> a huge miss for the project.
>>>>>>>> Also happy to open the same in JIRA if you can point me to where in
>>>>>>>> JIRA I can do that.
>>>>>>>> I couldn't find that on the contact-us
>>>>>>>> <https://beam.apache.org/community/contact-us/> page - it actually
>>>>>>>> points you to create an issue in Github which is what I did.
>>>>>>>>
>>>>>>>> Finally I've been looking into other stream consumers in Beam IO.
>>>>>>>>
>>>>>>>> I might be missing something but I think SqsIO seems to not be
>>>>>>>> managing the same - but rather expects SQS to manage data distribution
>>>>>>>> between consumers:
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>>>>>>>>
>>>>>>>> I think same goes for PubsubIO which I can't see where it tries to
>>>>>>>> set any control over which part of the stream would be managed by which
>>>>>>>> consumer but lets the Pubsub service do that instead:
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>>>>>>>>
>>>>>>>> I would ask why should Kafka not be considered a "streaming service"
>>>>>>>> just like SQS and Pubsub and allow it to manage data distribution just like
>>>>>>>> they would?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Shahar.
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>>
>>>>>>>> Shahar Frank
>>>>>>>>
>>>>>>>> srfrnk@gmail.com
>>>>>>>>
>>>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>> the best practice recommended to consume for Kafka
>>>>>>>>> This is when using the consumer API directly- when using a framework
>>>>>>>>> like beam or even kafka streams (which actually doesn't use the consumer
>>>>>>>>> group assignment mechanism, see here
>>>>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>>>>>>>>> you should rely on the framework's partition to task assignment mechanism.
>>>>>>>>>
>>>>>>>>>> all observability tools around Kafka use consumer group lag which
>>>>>>>>> is going to be unavailable if consumer groups are not being used
>>>>>>>>>
>>>>>>>>> This is a valid feature request, to reflect the underlying progress
>>>>>>>>> in offset commits, filing a feature request on JIRA would be the best way
>>>>>>>>> to get this prioritized.
>>>>>>>>>
>>>>>>>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>>>>> distribution by using consumer groups.
>>>>>>>>>
>>>>>>>>> No it doesn't, see above. Beam allows you to scale out horizontally
>>>>>>>>> as well, but neither allows you to scale the source horizontally beyond the
>>>>>>>>> number of partitions on the topic.
>>>>>>>>>
>>>>>>>>>> I'm required to maintain another distributed processing cluster
>>>>>>>>> like Spark or Flink (on top of a Kafka cluster I already have)
>>>>>>>>>
>>>>>>>>> Kafka streams does not run on your kafka cluster. It is a separate
>>>>>>>>> runtime that you need to turn up and run jobs for separately, same as
>>>>>>>>> flink. The only difference is that, effectively, you can only run kafka
>>>>>>>>> streams in application mode while flink can also be run in session/cluster
>>>>>>>>> mode.
>>>>>>>>>
>>>>>>>>> The main downside of Kafka streams is that it can only be used
>>>>>>>>> talking with kafka. If you ever want to read batch data or from/to another
>>>>>>>>> streaming system you cannot reuse your existing code/architecture and need
>>>>>>>>> to rewrite everything. One advantage of decoupled frameworks like beam (or
>>>>>>>>> flink or spark) is that the same pipeline and code can be reused for
>>>>>>>>> various data sources and sinks, and they come with a library of prebuilt
>>>>>>>>> ones.
>>>>>>>>>
>>>>>>>>>> documentation
>>>>>>>>> If the documentation misleadingly suggests that you can set group.id,
>>>>>>>>> and doing so does not make upstream offset commits to allow you to access
>>>>>>>>> metrics, please file a bug on JIRA so either the documentation or
>>>>>>>>> implementation can be corrected.
>>>>>>>>>
>>>>>>>>> -Daniel
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for the information Daniel,
>>>>>>>>>>
>>>>>>>>>> For one - Kafka Consumer Groups is the best practice recommended to
>>>>>>>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
>>>>>>>>>> Also all observability tools around Kafka use consumer group lag
>>>>>>>>>> which is going to be unavailable if consumer groups are not being used.
>>>>>>>>>>
>>>>>>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
>>>>>>>>>> Kafka Streams.
>>>>>>>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>>>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>>>>>> distribution by using consumer groups.
>>>>>>>>>> With Apache Beam I'm required to maintain another distributed
>>>>>>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I already
>>>>>>>>>> have) to be able to do the same.
>>>>>>>>>> To be clear in this use case there is no need for an additional
>>>>>>>>>> cluster except for consumer groups not being used.
>>>>>>>>>> This constitutes a disadvantage over Kafka Streams and other
>>>>>>>>>> solutions that use consumer groups.
>>>>>>>>>>
>>>>>>>>>> Furthermore if this use case is not supported I would imagine the
>>>>>>>>>> documentation would mention that or at least not imply to the contrary.
>>>>>>>>>> In the latest version of the documentation for KafkaIO
>>>>>>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>>>>>>>>>> shows an example for configuring to use a consumer group while in fact this
>>>>>>>>>> settings will not be doing anything of the sort:
>>>>>>>>>> [image: image.png]
>>>>>>>>>> And:
>>>>>>>>>> [image: image.png]
>>>>>>>>>>
>>>>>>>>>> It seems like this has already been raised in the past - e.g. here
>>>>>>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>>>>>>>>>> so I'm probably not the first person to be confused about that.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Shahar.
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>>
>>>>>>>>>> Shahar Frank
>>>>>>>>>>
>>>>>>>>>> srfrnk@gmail.com
>>>>>>>>>>
>>>>>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
>>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Why do you want to use a consumer group? If you have consumers in
>>>>>>>>>>> other jobs, your beam job will fail to receive all messages it should for
>>>>>>>>>>> the topic.
>>>>>>>>>>>
>>>>>>>>>>>> It seems the code attempts to circumvent the partition
>>>>>>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>>>>>>>
>>>>>>>>>>> All beam I/Os for partitioned sources do this. They use access to
>>>>>>>>>>> the partitioning structure of the underlying system to track their progress
>>>>>>>>>>> through each partition and provide feedback for scaling, as well as
>>>>>>>>>>> tracking and enforcing exactly once processing semantics. In fact, most
>>>>>>>>>>> interops with streaming data processing systems do this, you can see the
>>>>>>>>>>> documentation of the flink kafka interop (
>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>>>>>>>>>> that it does not use or respect the partition assignments.
>>>>>>>>>>>
>>>>>>>>>>>> By doing that it prevents the user from using consumer groups.
>>>>>>>>>>> Again, why do you (as a user) want to use consumer groups? What
>>>>>>>>>>> value does it provide you?
>>>>>>>>>>>
>>>>>>>>>>> -Daniel
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> Posting here as suggested here
>>>>>>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
>>>>>>>>>>>> .
>>>>>>>>>>>>
>>>>>>>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>>>>>>>>>> I've added "group.id" to the consumer properties.
>>>>>>>>>>>> When running the pipeline I can see this value sent to Kafka in
>>>>>>>>>>>> the consumer properties.
>>>>>>>>>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>>>>>>>>>> Looking into the code I can see that nowhere is the consumer
>>>>>>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>>>>>>>>>> consumer group. It seems the code attempts to circumvent the partition
>>>>>>>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>>>>>>>> By doing that it prevents the user from using consumer groups.
>>>>>>>>>>>> Is that by intention? Is there any reason why the decision to
>>>>>>>>>>>> avoid using consumer groups has been taken?
>>>>>>>>>>>> I would love to see any documentation about that if possible
>>>>>>>>>>>> please.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Shahar.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Shaojie Wu <ga...@apache.org>.
Can't agree with Shahar Frank more 

On 2023/04/19 18:17:15 Shahar Frank wrote:
> Hi Daniel,
> 
> I think I've already answered these in a previous email but let me answer
> them again.
> 
> I was specifically responding to quoted points from your last email. I
> > really don't understand why you, as a user, care if the implementation of
> > the framework is using consumer groups or not as long as it has the
> > throughput you need and is correct. If there is something specific this
> > would be useful for, like monitoring or metrics, it seems a reasonable
> > feature request to me to ask to reflect the progress state in a kafka
> > consumer group, but not to use the underlying assignment mechanism for the
> > reasons stated above.
> >
> 
> I do care for a couple of reasons:
> 1) Introducing risk with a technology that non one knows in the company vs.
> a technology people know and trust (i.e. Kafka Consumer Groups)
> 2) A multitude of alerting, monitoring and other observability tools that
> are using consumer groups will not be usable and new solutions would be
> required
> 3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in
> the company will become useless - and this in turn introduces risk to
> projects
> 
> If you want to run in a single machine application mode, you can try
> > setting the `flinkMaster` parameter to `[local]`, which should launch an
> > inline flink runner just for your pipeline. If you want to have a scaling
> > out cluster per-application, you can launch a repeatable flink cluster with
> > kubernetes on a per-application basis pretty easily.
> 
> 
> I do agree that a Flink cluster is a great solution and have maintained a
> few in my time.
> Sadly in our use case I have to consider constraints set by security and
> platform teams and that will take time.
> By the time these follow through it is very likely that the decision to use
> Beam would have fallen in favor of other solutions (i.e. Kafka Streams,
> Camel and others) and this would be a shame in my view. It is very unlikely
> that once taken this decision would be reversed for a long time.
> 
> Given that a Flink cluster is not an option for me at this point I have
> been trying to push a solution where instances of a Beam pipeline are run
> "disconnected" using a DIrectRunner or (better even) a FlinkRunner in local
> standalone mode (as you're suggesting) - and like you suggested we are
> running those using a K8s deployment which allows us to scale up a required.
> The issue is if more than one pod attempts to run the pipeline - they will
> not split the partitions between them but rather each would consume ALL
> partitions and the output would include as many duplications as the number
> of pods. This solution will then not be able to scale up horizontally.
> 
> That is exactly why I'm trying to suggest using consumer groups.
> In this attempt I created - here
> <https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java>
> -  I've already shown it is possible (albeit I admit with limitations such
> as you described) to use consumer groups and effectively allow our use case
> to run on a scaled up K8s deployment of DirectRunners.
> 
> And again finally my question is why should Kafka be treated differently
> from other messaging systems like SQS and PubSub for which it seems Beam
> does not attempt to manage the distribution strategy as well the mechanism
> for managing processed (committed) messages?
> 
> If Beam is able to perform as well with them managing these couldn't the
> same be applied to Kafka?
> 
> Cheers,
> Shahar.
> 
> ------------------------------
> 
> Shahar Frank
> 
> srfrnk@gmail.com
> 
> +447799561438
> 
> ------------------------------
> 
> 
> 
> 
> 
> On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dp...@google.com> wrote:
> 
> > Hello,
> >
> > I was specifically responding to quoted points from your last email. I
> > really don't understand why you, as a user, care if the implementation of
> > the framework is using consumer groups or not as long as it has the
> > throughput you need and is correct. If there is something specific this
> > would be useful for, like monitoring or metrics, it seems a reasonable
> > feature request to me to ask to reflect the progress state in a kafka
> > consumer group, but not to use the underlying assignment mechanism for the
> > reasons stated above.
> >
> > Per: "why Beam should recommend using a distributed processing framework"
> >
> > If you want to run in a single machine application mode, you can try
> > setting the `flinkMaster` parameter to `[local]`, which should launch an
> > inline flink runner just for your pipeline. If you want to have a scaling
> > out cluster per-application, you can launch a repeatable flink cluster with
> > kubernetes on a per-application basis pretty easily.
> >
> > -Daniel
> >
> > On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <sr...@gmail.com> wrote:
> >
> >> Hi Daniel,
> >>
> >> I think you missed my last email that deals exactly with what you just
> >> commented.
> >>
> >> I can send it again if you can't find it
> >>
> >> Shahar.
> >>
> >>
> >> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dp...@google.com> wrote:
> >>
> >>> > The real question I feel is why should there not be an option with
> >>> Beam to use a recommended best practice (by another apache project in fact)
> >>> when connecting to an external system?
> >>>
> >>> You ignored my previous answer. This is not a "best practice" for
> >>> streaming frameworks, only for applications, and is not used by other
> >>> frameworks *including kafka streams*.
> >>>
> >>> > The same as when connecting to SQS and PubSub should also be
> >>> implemented with Kafka I think.
> >>>
> >>> This makes it much harder to implement a consistent watermark and
> >>> involves either expensive secondary processing (Pub/Sub has a whole second
> >>> tracking subscription) or incorrectness of the watermark bounds when
> >>> reading from backlog. This makes windowing more likely to be incorrect.
> >>>
> >>> > the user should be allowed the option of using a mechanism that is
> >>> part of the system being connected if willing to accept the implication it
> >>> has.
> >>>
> >>> This requires a complete rewrite of how KafkaIO operates, so its not as
> >>> easy as "flip a switch".
> >>>
> >>> >  And then the real question behind that would be - is there anything
> >>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
> >>>
> >>> You haven't answered my original question: Why do you care if it uses
> >>> consumer groups or not?
> >>>
> >>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <sr...@gmail.com> wrote:
> >>>
> >>>> Thanks Kenn,
> >>>>
> >>>> I agree with your comments fully.
> >>>> I would rather use a Flink cluster even with the simple use case we
> >>>> have now.
> >>>>
> >>>> Sadly sometimes we have other constraints - especially in larger
> >>>> corporations like the one I work for - which make it harder to create and
> >>>> maintain these without real reasons.
> >>>> My intention is to introduce Apache Beam right now to allow for future
> >>>> growth opportunities such as you described but to do that I have to
> >>>> convince others that it is the best thing to do right now.
> >>>>
> >>>> It so happens that for the use case we are facing now perhaps Apache
> >>>> Beam is too much and it looks like Kafka Streams would allow us to avoid
> >>>> having to maintain another infrastructure cluster such as Flink.
> >>>>
> >>>> I would prefer to be able to propose a way we can use Beam right now
> >>>> without the added overhead of a Flink/Spark cluster so that I can convince
> >>>> the teams that it is a viable option right now.
> >>>> The alternative of switching to Beam once more requirements arrive
> >>>> would be much less preferable as this is likely to never gather enough
> >>>> momentum for a full refactoring.
> >>>>
> >>>> Finally I feel the question that really needs to be asked is not why
> >>>> Beam should recommend using a distributed processing framework which
> >>>> totally makes sense and not even why it may require that in some use cases.
> >>>>
> >>>> The real question I feel is why should there not be an option with Beam
> >>>> to use a recommended best practice (by another apache project in fact) when
> >>>> connecting to an external system?
> >>>> The same as when connecting to SQS and PubSub should also be
> >>>> implemented with Kafka I think.
> >>>> If we think the existing connector has advantages in some use cases
> >>>> then by all means it should still exist as an option however I feel the
> >>>> user should be allowed the option of using a mechanism that is part of the
> >>>> system being connected if willing to accept the implication it has.
> >>>>
> >>>> And then the real question behind that would be - is there anything
> >>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
> >>>>
> >>>> Cheers,
> >>>> Shahar.
> >>>>
> >>>> ------------------------------
> >>>>
> >>>> Shahar Frank
> >>>>
> >>>> srfrnk@gmail.com
> >>>>
> >>>> +447799561438 <+44%207799%20561438>
> >>>>
> >>>> ------------------------------
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <ke...@apache.org> wrote:
> >>>>
> >>>>> Interesting discussion! I don't have a lot of expertise in some of
> >>>>> these details but I wanted to just add one little comment.
> >>>>>
> >>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Regarding horizontal scaling I may have not been clear about what I
> >>>>>> mean.
> >>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s
> >>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups managing
> >>>>>> the distribution of data all would work well.
> >>>>>> Moreover I can create more pods than the number of partitions and
> >>>>>> keep them idle so that if/when some crash others pick the slack quicker. And
> >>>>>> this would all be managed by the Kafka consumer group coordinator.
> >>>>>> If I do the same with an Apache Beam application - for example with a
> >>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance
> >>>>>> will consume the entire topic as it is unaware of the other instances. To
> >>>>>> work I will be required to use something like a Flink runner with a full
> >>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
> >>>>>> where maintaining such an additional cluster is not required for the actual
> >>>>>> functionality (e.g. just for filtering) and incurs costs not everyone is
> >>>>>> willing to make.
> >>>>>>
> >>>>>
> >>>>> Horizontal scaling is built into Beam. It all occurs within a single
> >>>>> pipeline. You should not try to scale up by running multiple pipelines
> >>>>> consuming from the same consumer group. Running your Beam pipeline on a
> >>>>> Flink cluster (or any other distributed runner) is the intended way to
> >>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
> >>>>> most" sophisticated) sharding and work balancing model, for distributing
> >>>>> shards of work across workers. So it is by design that Beam is aware of the
> >>>>> sharding, but also does its own thing above and beyond. It is true that if
> >>>>> you have a very simple use case then Beam could be more than you need. Beam
> >>>>> is very general purpose. Of course, simple uses grow into more complex uses
> >>>>> :-) and you might end up stuck and/or porting to a distributed processing
> >>>>> engine after all. It all depends on what you are trying to do and what you
> >>>>> might do in the future!
> >>>>>
> >>>>> Kenn
> >>>>>
> >>>>>
> >>>>>
> >>>>>>
> >>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was
> >>>>>> not saying it would. It might be run on a K8s cluster which is also how I
> >>>>>> might be running K8s itself and any Flink/Spark cluster I might need. While
> >>>>>> Flink can run in a standalone as well as cluster mode... as I said earlier
> >>>>>> in this use case when Flink is used in standalone mode the use case fails -
> >>>>>> which is my issue, as the same will work with Kafka Streams.
> >>>>>>
> >>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer
> >>>>>> to use Beam but to do that I need to show that we have no disadvantages for
> >>>>>> the initial use case.
> >>>>>> I'm also very aware of the other benefits of Beam - being runner
> >>>>>> agnostic, language agnostic, source and sink agostic etc. which is why I
> >>>>>> would very much like to use it right now.
> >>>>>> Sadly right now if we are unable to scale horizontally without
> >>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major
> >>>>>> disadvantage which might drive us to use Kafka Streams instead.
> >>>>>>
> >>>>>> I have opened a bug in Github
> >>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can
> >>>>>> edit that to be more specific to documentation however I feel this would be
> >>>>>> a huge miss for the project.
> >>>>>> Also happy to open the same in JIRA if you can point me to where in
> >>>>>> JIRA I can do that.
> >>>>>> I couldn't find that on the contact-us
> >>>>>> <https://beam.apache.org/community/contact-us/> page - it actually
> >>>>>> points you to create an issue in Github which is what I did.
> >>>>>>
> >>>>>> Finally I've been looking into other stream consumers in Beam IO.
> >>>>>>
> >>>>>> I might be missing something but I think SqsIO seems to not be
> >>>>>> managing the same - but rather expects SQS to manage data distribution
> >>>>>> between consumers:
> >>>>>>
> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
> >>>>>>
> >>>>>> I think same goes for PubsubIO which I can't see where it tries to
> >>>>>> set any control over which part of the stream would be managed by which
> >>>>>> consumer but lets the Pubsub service do that instead:
> >>>>>>
> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
> >>>>>>
> >>>>>> I would ask why should Kafka not be considered a "streaming service"
> >>>>>> just like SQS and Pubsub and allow it to manage data distribution just like
> >>>>>> they would?
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Shahar.
> >>>>>>
> >>>>>> ------------------------------
> >>>>>>
> >>>>>> Shahar Frank
> >>>>>>
> >>>>>> srfrnk@gmail.com
> >>>>>>
> >>>>>> +447799561438 <+44%207799%20561438>
> >>>>>>
> >>>>>> ------------------------------
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> > the best practice recommended to consume for Kafka
> >>>>>>>
> >>>>>>> This is when using the consumer API directly- when using a framework
> >>>>>>> like beam or even kafka streams (which actually doesn't use the consumer
> >>>>>>> group assignment mechanism, see here
> >>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
> >>>>>>> you should rely on the framework's partition to task assignment mechanism.
> >>>>>>>
> >>>>>>> > all observability tools around Kafka use consumer group lag which
> >>>>>>> is going to be unavailable if consumer groups are not being used
> >>>>>>>
> >>>>>>> This is a valid feature request, to reflect the underlying progress
> >>>>>>> in offset commits, filing a feature request on JIRA would be the best way
> >>>>>>> to get this prioritized.
> >>>>>>>
> >>>>>>> > Kafka Streams allows me to horizontally scale by adding as many
> >>>>>>> instances of my application as I need and relies on Kafka to manage
> >>>>>>> distribution by using consumer groups.
> >>>>>>>
> >>>>>>> No it doesn't, see above. Beam allows you to scale out horizontally
> >>>>>>> as well, but neither allows you to scale the source horizontally beyond the
> >>>>>>> number of partitions on the topic.
> >>>>>>>
> >>>>>>> > I'm required to maintain another distributed processing cluster
> >>>>>>> like Spark or Flink (on top of a Kafka cluster I already have)
> >>>>>>>
> >>>>>>> Kafka streams does not run on your kafka cluster. It is a separate
> >>>>>>> runtime that you need to turn up and run jobs for separately, same as
> >>>>>>> flink. The only difference is that, effectively, you can only run kafka
> >>>>>>> streams in application mode while flink can also be run in session/cluster
> >>>>>>> mode.
> >>>>>>>
> >>>>>>> The main downside of Kafka streams is that it can only be used
> >>>>>>> talking with kafka. If you ever want to read batch data or from/to another
> >>>>>>> streaming system you cannot reuse your existing code/architecture and need
> >>>>>>> to rewrite everything. One advantage of decoupled frameworks like beam (or
> >>>>>>> flink or spark) is that the same pipeline and code can be reused for
> >>>>>>> various data sources and sinks, and they come with a library of prebuilt
> >>>>>>> ones.
> >>>>>>>
> >>>>>>> > documentation
> >>>>>>>
> >>>>>>> If the documentation misleadingly suggests that you can set group.id,
> >>>>>>> and doing so does not make upstream offset commits to allow you to access
> >>>>>>> metrics, please file a bug on JIRA so either the documentation or
> >>>>>>> implementation can be corrected.
> >>>>>>>
> >>>>>>> -Daniel
> >>>>>>>
> >>>>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the information Daniel,
> >>>>>>>>
> >>>>>>>> For one - Kafka Consumer Groups is the best practice recommended to
> >>>>>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
> >>>>>>>> Also all observability tools around Kafka use consumer group lag
> >>>>>>>> which is going to be unavailable if consumer groups are not being used.
> >>>>>>>>
> >>>>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
> >>>>>>>> Kafka Streams.
> >>>>>>>> Kafka Streams allows me to horizontally scale by adding as many
> >>>>>>>> instances of my application as I need and relies on Kafka to manage
> >>>>>>>> distribution by using consumer groups.
> >>>>>>>> With Apache Beam I'm required to maintain another distributed
> >>>>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I already
> >>>>>>>> have) to be able to do the same.
> >>>>>>>> To be clear in this use case there is no need for an additional
> >>>>>>>> cluster except for consumer groups not being used.
> >>>>>>>> This constitutes a disadvantage over Kafka Streams and other
> >>>>>>>> solutions that use consumer groups.
> >>>>>>>>
> >>>>>>>> Furthermore if this use case is not supported I would imagine the
> >>>>>>>> documentation would mention that or at least not imply to the contrary.
> >>>>>>>> In the latest version of the documentation for KafkaIO
> >>>>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
> >>>>>>>> shows an example for configuring to use a consumer group while in fact this
> >>>>>>>> settings will not be doing anything of the sort:
> >>>>>>>> [image: image.png]
> >>>>>>>> And:
> >>>>>>>> [image: image.png]
> >>>>>>>>
> >>>>>>>> It seems like this has already been raised in the past - e.g. here
> >>>>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
> >>>>>>>> so I'm probably not the first person to be confused about that.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Shahar.
> >>>>>>>>
> >>>>>>>> ------------------------------
> >>>>>>>>
> >>>>>>>> Shahar Frank
> >>>>>>>>
> >>>>>>>> srfrnk@gmail.com
> >>>>>>>>
> >>>>>>>> +447799561438 <+44%207799%20561438>
> >>>>>>>>
> >>>>>>>> ------------------------------
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
> >>>>>>>> dev@beam.apache.org> wrote:
> >>>>>>>>
> >>>>>>>>> Why do you want to use a consumer group? If you have consumers in
> >>>>>>>>> other jobs, your beam job will fail to receive all messages it should for
> >>>>>>>>> the topic.
> >>>>>>>>>
> >>>>>>>>> > It seems the code attempts to circumvent the partition
> >>>>>>>>> assignment mechanism provided by Kafka to use it's own.
> >>>>>>>>>
> >>>>>>>>> All beam I/Os for partitioned sources do this. They use access to
> >>>>>>>>> the partitioning structure of the underlying system to track their progress
> >>>>>>>>> through each partition and provide feedback for scaling, as well as
> >>>>>>>>> tracking and enforcing exactly once processing semantics. In fact, most
> >>>>>>>>> interops with streaming data processing systems do this, you can see the
> >>>>>>>>> documentation of the flink kafka interop (
> >>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
> >>>>>>>>> that it does not use or respect the partition assignments.
> >>>>>>>>>
> >>>>>>>>> > By doing that it prevents the user from using consumer groups.
> >>>>>>>>>
> >>>>>>>>> Again, why do you (as a user) want to use consumer groups? What
> >>>>>>>>> value does it provide you?
> >>>>>>>>>
> >>>>>>>>> -Daniel
> >>>>>>>>>
> >>>>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi All,
> >>>>>>>>>>
> >>>>>>>>>> Posting here as suggested here
> >>>>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
> >>>>>>>>>> .
> >>>>>>>>>>
> >>>>>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
> >>>>>>>>>> I've added "group.id" to the consumer properties.
> >>>>>>>>>> When running the pipeline I can see this value sent to Kafka in
> >>>>>>>>>> the consumer properties.
> >>>>>>>>>> The consumers created by KafkaIO fail to join the consumer group.
> >>>>>>>>>> Looking into the code I can see that nowhere is the consumer
> >>>>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
> >>>>>>>>>> consumer group. It seems the code attempts to circumvent the partition
> >>>>>>>>>> assignment mechanism provided by Kafka to use it's own.
> >>>>>>>>>> By doing that it prevents the user from using consumer groups.
> >>>>>>>>>> Is that by intention? Is there any reason why the decision to
> >>>>>>>>>> avoid using consumer groups has been taken?
> >>>>>>>>>> I would love to see any documentation about that if possible
> >>>>>>>>>> please.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Shahar.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> 

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Shahar Frank <sr...@gmail.com>.
Hi Daniel,

I think I've already answered these in a previous email but let me answer
them again.

I was specifically responding to quoted points from your last email. I
> really don't understand why you, as a user, care if the implementation of
> the framework is using consumer groups or not as long as it has the
> throughput you need and is correct. If there is something specific this
> would be useful for, like monitoring or metrics, it seems a reasonable
> feature request to me to ask to reflect the progress state in a kafka
> consumer group, but not to use the underlying assignment mechanism for the
> reasons stated above.
>

I do care for a couple of reasons:
1) Introducing risk with a technology that non one knows in the company vs.
a technology people know and trust (i.e. Kafka Consumer Groups)
2) A multitude of alerting, monitoring and other observability tools that
are using consumer groups will not be usable and new solutions would be
required
3) Expert knowhow on managing (and sometimes fixing) issues with Kafka in
the company will become useless - and this in turn introduces risk to
projects

If you want to run in a single machine application mode, you can try
> setting the `flinkMaster` parameter to `[local]`, which should launch an
> inline flink runner just for your pipeline. If you want to have a scaling
> out cluster per-application, you can launch a repeatable flink cluster with
> kubernetes on a per-application basis pretty easily.


I do agree that a Flink cluster is a great solution and have maintained a
few in my time.
Sadly in our use case I have to consider constraints set by security and
platform teams and that will take time.
By the time these follow through it is very likely that the decision to use
Beam would have fallen in favor of other solutions (i.e. Kafka Streams,
Camel and others) and this would be a shame in my view. It is very unlikely
that once taken this decision would be reversed for a long time.

Given that a Flink cluster is not an option for me at this point I have
been trying to push a solution where instances of a Beam pipeline are run
"disconnected" using a DIrectRunner or (better even) a FlinkRunner in local
standalone mode (as you're suggesting) - and like you suggested we are
running those using a K8s deployment which allows us to scale up a required.
The issue is if more than one pod attempts to run the pipeline - they will
not split the partitions between them but rather each would consume ALL
partitions and the output would include as many duplications as the number
of pods. This solution will then not be able to scale up horizontally.

That is exactly why I'm trying to suggest using consumer groups.
In this attempt I created - here
<https://github.com/srfrnk/demo/blob/cg-reader/beam-kafka2file/src/main/java/kafka/Read.java>
-  I've already shown it is possible (albeit I admit with limitations such
as you described) to use consumer groups and effectively allow our use case
to run on a scaled up K8s deployment of DirectRunners.

And again finally my question is why should Kafka be treated differently
from other messaging systems like SQS and PubSub for which it seems Beam
does not attempt to manage the distribution strategy as well the mechanism
for managing processed (committed) messages?

If Beam is able to perform as well with them managing these couldn't the
same be applied to Kafka?

Cheers,
Shahar.

------------------------------

Shahar Frank

srfrnk@gmail.com

+447799561438

------------------------------





On Wed, 19 Apr 2023 at 13:19, Daniel Collins <dp...@google.com> wrote:

> Hello,
>
> I was specifically responding to quoted points from your last email. I
> really don't understand why you, as a user, care if the implementation of
> the framework is using consumer groups or not as long as it has the
> throughput you need and is correct. If there is something specific this
> would be useful for, like monitoring or metrics, it seems a reasonable
> feature request to me to ask to reflect the progress state in a kafka
> consumer group, but not to use the underlying assignment mechanism for the
> reasons stated above.
>
> Per: "why Beam should recommend using a distributed processing framework"
>
> If you want to run in a single machine application mode, you can try
> setting the `flinkMaster` parameter to `[local]`, which should launch an
> inline flink runner just for your pipeline. If you want to have a scaling
> out cluster per-application, you can launch a repeatable flink cluster with
> kubernetes on a per-application basis pretty easily.
>
> -Daniel
>
> On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <sr...@gmail.com> wrote:
>
>> Hi Daniel,
>>
>> I think you missed my last email that deals exactly with what you just
>> commented.
>>
>> I can send it again if you can't find it
>>
>> Shahar.
>>
>>
>> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dp...@google.com> wrote:
>>
>>> > The real question I feel is why should there not be an option with
>>> Beam to use a recommended best practice (by another apache project in fact)
>>> when connecting to an external system?
>>>
>>> You ignored my previous answer. This is not a "best practice" for
>>> streaming frameworks, only for applications, and is not used by other
>>> frameworks *including kafka streams*.
>>>
>>> > The same as when connecting to SQS and PubSub should also be
>>> implemented with Kafka I think.
>>>
>>> This makes it much harder to implement a consistent watermark and
>>> involves either expensive secondary processing (Pub/Sub has a whole second
>>> tracking subscription) or incorrectness of the watermark bounds when
>>> reading from backlog. This makes windowing more likely to be incorrect.
>>>
>>> > the user should be allowed the option of using a mechanism that is
>>> part of the system being connected if willing to accept the implication it
>>> has.
>>>
>>> This requires a complete rewrite of how KafkaIO operates, so its not as
>>> easy as "flip a switch".
>>>
>>> >  And then the real question behind that would be - is there anything
>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>>
>>> You haven't answered my original question: Why do you care if it uses
>>> consumer groups or not?
>>>
>>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <sr...@gmail.com> wrote:
>>>
>>>> Thanks Kenn,
>>>>
>>>> I agree with your comments fully.
>>>> I would rather use a Flink cluster even with the simple use case we
>>>> have now.
>>>>
>>>> Sadly sometimes we have other constraints - especially in larger
>>>> corporations like the one I work for - which make it harder to create and
>>>> maintain these without real reasons.
>>>> My intention is to introduce Apache Beam right now to allow for future
>>>> growth opportunities such as you described but to do that I have to
>>>> convince others that it is the best thing to do right now.
>>>>
>>>> It so happens that for the use case we are facing now perhaps Apache
>>>> Beam is too much and it looks like Kafka Streams would allow us to avoid
>>>> having to maintain another infrastructure cluster such as Flink.
>>>>
>>>> I would prefer to be able to propose a way we can use Beam right now
>>>> without the added overhead of a Flink/Spark cluster so that I can convince
>>>> the teams that it is a viable option right now.
>>>> The alternative of switching to Beam once more requirements arrive
>>>> would be much less preferable as this is likely to never gather enough
>>>> momentum for a full refactoring.
>>>>
>>>> Finally I feel the question that really needs to be asked is not why
>>>> Beam should recommend using a distributed processing framework which
>>>> totally makes sense and not even why it may require that in some use cases.
>>>>
>>>> The real question I feel is why should there not be an option with Beam
>>>> to use a recommended best practice (by another apache project in fact) when
>>>> connecting to an external system?
>>>> The same as when connecting to SQS and PubSub should also be
>>>> implemented with Kafka I think.
>>>> If we think the existing connector has advantages in some use cases
>>>> then by all means it should still exist as an option however I feel the
>>>> user should be allowed the option of using a mechanism that is part of the
>>>> system being connected if willing to accept the implication it has.
>>>>
>>>> And then the real question behind that would be - is there anything
>>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>>>
>>>> Cheers,
>>>> Shahar.
>>>>
>>>> ------------------------------
>>>>
>>>> Shahar Frank
>>>>
>>>> srfrnk@gmail.com
>>>>
>>>> +447799561438 <+44%207799%20561438>
>>>>
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <ke...@apache.org> wrote:
>>>>
>>>>> Interesting discussion! I don't have a lot of expertise in some of
>>>>> these details but I wanted to just add one little comment.
>>>>>
>>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Regarding horizontal scaling I may have not been clear about what I
>>>>>> mean.
>>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s
>>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups managing
>>>>>> the distribution of data all would work well.
>>>>>> Moreover I can create more pods than the number of partitions and
>>>>>> keep them idle so that if/when some crash others pick the slack quicker. And
>>>>>> this would all be managed by the Kafka consumer group coordinator.
>>>>>> If I do the same with an Apache Beam application - for example with a
>>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance
>>>>>> will consume the entire topic as it is unaware of the other instances. To
>>>>>> work I will be required to use something like a Flink runner with a full
>>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
>>>>>> where maintaining such an additional cluster is not required for the actual
>>>>>> functionality (e.g. just for filtering) and incurs costs not everyone is
>>>>>> willing to make.
>>>>>>
>>>>>
>>>>> Horizontal scaling is built into Beam. It all occurs within a single
>>>>> pipeline. You should not try to scale up by running multiple pipelines
>>>>> consuming from the same consumer group. Running your Beam pipeline on a
>>>>> Flink cluster (or any other distributed runner) is the intended way to
>>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
>>>>> most" sophisticated) sharding and work balancing model, for distributing
>>>>> shards of work across workers. So it is by design that Beam is aware of the
>>>>> sharding, but also does its own thing above and beyond. It is true that if
>>>>> you have a very simple use case then Beam could be more than you need. Beam
>>>>> is very general purpose. Of course, simple uses grow into more complex uses
>>>>> :-) and you might end up stuck and/or porting to a distributed processing
>>>>> engine after all. It all depends on what you are trying to do and what you
>>>>> might do in the future!
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was
>>>>>> not saying it would. It might be run on a K8s cluster which is also how I
>>>>>> might be running K8s itself and any Flink/Spark cluster I might need. While
>>>>>> Flink can run in a standalone as well as cluster mode... as I said earlier
>>>>>> in this use case when Flink is used in standalone mode the use case fails -
>>>>>> which is my issue, as the same will work with Kafka Streams.
>>>>>>
>>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer
>>>>>> to use Beam but to do that I need to show that we have no disadvantages for
>>>>>> the initial use case.
>>>>>> I'm also very aware of the other benefits of Beam - being runner
>>>>>> agnostic, language agnostic, source and sink agostic etc. which is why I
>>>>>> would very much like to use it right now.
>>>>>> Sadly right now if we are unable to scale horizontally without
>>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major
>>>>>> disadvantage which might drive us to use Kafka Streams instead.
>>>>>>
>>>>>> I have opened a bug in Github
>>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can
>>>>>> edit that to be more specific to documentation however I feel this would be
>>>>>> a huge miss for the project.
>>>>>> Also happy to open the same in JIRA if you can point me to where in
>>>>>> JIRA I can do that.
>>>>>> I couldn't find that on the contact-us
>>>>>> <https://beam.apache.org/community/contact-us/> page - it actually
>>>>>> points you to create an issue in Github which is what I did.
>>>>>>
>>>>>> Finally I've been looking into other stream consumers in Beam IO.
>>>>>>
>>>>>> I might be missing something but I think SqsIO seems to not be
>>>>>> managing the same - but rather expects SQS to manage data distribution
>>>>>> between consumers:
>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>>>>>>
>>>>>> I think same goes for PubsubIO which I can't see where it tries to
>>>>>> set any control over which part of the stream would be managed by which
>>>>>> consumer but lets the Pubsub service do that instead:
>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>>>>>>
>>>>>> I would ask why should Kafka not be considered a "streaming service"
>>>>>> just like SQS and Pubsub and allow it to manage data distribution just like
>>>>>> they would?
>>>>>>
>>>>>> Cheers,
>>>>>> Shahar.
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> Shahar Frank
>>>>>>
>>>>>> srfrnk@gmail.com
>>>>>>
>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> > the best practice recommended to consume for Kafka
>>>>>>>
>>>>>>> This is when using the consumer API directly- when using a framework
>>>>>>> like beam or even kafka streams (which actually doesn't use the consumer
>>>>>>> group assignment mechanism, see here
>>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>>>>>>> you should rely on the framework's partition to task assignment mechanism.
>>>>>>>
>>>>>>> > all observability tools around Kafka use consumer group lag which
>>>>>>> is going to be unavailable if consumer groups are not being used
>>>>>>>
>>>>>>> This is a valid feature request, to reflect the underlying progress
>>>>>>> in offset commits, filing a feature request on JIRA would be the best way
>>>>>>> to get this prioritized.
>>>>>>>
>>>>>>> > Kafka Streams allows me to horizontally scale by adding as many
>>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>>> distribution by using consumer groups.
>>>>>>>
>>>>>>> No it doesn't, see above. Beam allows you to scale out horizontally
>>>>>>> as well, but neither allows you to scale the source horizontally beyond the
>>>>>>> number of partitions on the topic.
>>>>>>>
>>>>>>> > I'm required to maintain another distributed processing cluster
>>>>>>> like Spark or Flink (on top of a Kafka cluster I already have)
>>>>>>>
>>>>>>> Kafka streams does not run on your kafka cluster. It is a separate
>>>>>>> runtime that you need to turn up and run jobs for separately, same as
>>>>>>> flink. The only difference is that, effectively, you can only run kafka
>>>>>>> streams in application mode while flink can also be run in session/cluster
>>>>>>> mode.
>>>>>>>
>>>>>>> The main downside of Kafka streams is that it can only be used
>>>>>>> talking with kafka. If you ever want to read batch data or from/to another
>>>>>>> streaming system you cannot reuse your existing code/architecture and need
>>>>>>> to rewrite everything. One advantage of decoupled frameworks like beam (or
>>>>>>> flink or spark) is that the same pipeline and code can be reused for
>>>>>>> various data sources and sinks, and they come with a library of prebuilt
>>>>>>> ones.
>>>>>>>
>>>>>>> > documentation
>>>>>>>
>>>>>>> If the documentation misleadingly suggests that you can set group.id,
>>>>>>> and doing so does not make upstream offset commits to allow you to access
>>>>>>> metrics, please file a bug on JIRA so either the documentation or
>>>>>>> implementation can be corrected.
>>>>>>>
>>>>>>> -Daniel
>>>>>>>
>>>>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the information Daniel,
>>>>>>>>
>>>>>>>> For one - Kafka Consumer Groups is the best practice recommended to
>>>>>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
>>>>>>>> Also all observability tools around Kafka use consumer group lag
>>>>>>>> which is going to be unavailable if consumer groups are not being used.
>>>>>>>>
>>>>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
>>>>>>>> Kafka Streams.
>>>>>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>>>> distribution by using consumer groups.
>>>>>>>> With Apache Beam I'm required to maintain another distributed
>>>>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I already
>>>>>>>> have) to be able to do the same.
>>>>>>>> To be clear in this use case there is no need for an additional
>>>>>>>> cluster except for consumer groups not being used.
>>>>>>>> This constitutes a disadvantage over Kafka Streams and other
>>>>>>>> solutions that use consumer groups.
>>>>>>>>
>>>>>>>> Furthermore if this use case is not supported I would imagine the
>>>>>>>> documentation would mention that or at least not imply to the contrary.
>>>>>>>> In the latest version of the documentation for KafkaIO
>>>>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>>>>>>>> shows an example for configuring to use a consumer group while in fact this
>>>>>>>> settings will not be doing anything of the sort:
>>>>>>>> [image: image.png]
>>>>>>>> And:
>>>>>>>> [image: image.png]
>>>>>>>>
>>>>>>>> It seems like this has already been raised in the past - e.g. here
>>>>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>>>>>>>> so I'm probably not the first person to be confused about that.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Shahar.
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>>
>>>>>>>> Shahar Frank
>>>>>>>>
>>>>>>>> srfrnk@gmail.com
>>>>>>>>
>>>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> Why do you want to use a consumer group? If you have consumers in
>>>>>>>>> other jobs, your beam job will fail to receive all messages it should for
>>>>>>>>> the topic.
>>>>>>>>>
>>>>>>>>> > It seems the code attempts to circumvent the partition
>>>>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>>>>>
>>>>>>>>> All beam I/Os for partitioned sources do this. They use access to
>>>>>>>>> the partitioning structure of the underlying system to track their progress
>>>>>>>>> through each partition and provide feedback for scaling, as well as
>>>>>>>>> tracking and enforcing exactly once processing semantics. In fact, most
>>>>>>>>> interops with streaming data processing systems do this, you can see the
>>>>>>>>> documentation of the flink kafka interop (
>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>>>>>>>> that it does not use or respect the partition assignments.
>>>>>>>>>
>>>>>>>>> > By doing that it prevents the user from using consumer groups.
>>>>>>>>>
>>>>>>>>> Again, why do you (as a user) want to use consumer groups? What
>>>>>>>>> value does it provide you?
>>>>>>>>>
>>>>>>>>> -Daniel
>>>>>>>>>
>>>>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> Posting here as suggested here
>>>>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
>>>>>>>>>> .
>>>>>>>>>>
>>>>>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>>>>>>>> I've added "group.id" to the consumer properties.
>>>>>>>>>> When running the pipeline I can see this value sent to Kafka in
>>>>>>>>>> the consumer properties.
>>>>>>>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>>>>>>>> Looking into the code I can see that nowhere is the consumer
>>>>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>>>>>>>> consumer group. It seems the code attempts to circumvent the partition
>>>>>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>>>>>> By doing that it prevents the user from using consumer groups.
>>>>>>>>>> Is that by intention? Is there any reason why the decision to
>>>>>>>>>> avoid using consumer groups has been taken?
>>>>>>>>>> I would love to see any documentation about that if possible
>>>>>>>>>> please.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Shahar.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Daniel Collins via dev <de...@beam.apache.org>.
Hello,

I was specifically responding to quoted points from your last email. I
really don't understand why you, as a user, care if the implementation of
the framework is using consumer groups or not as long as it has the
throughput you need and is correct. If there is something specific this
would be useful for, like monitoring or metrics, it seems a reasonable
feature request to me to ask to reflect the progress state in a kafka
consumer group, but not to use the underlying assignment mechanism for the
reasons stated above.

Per: "why Beam should recommend using a distributed processing framework"

If you want to run in a single machine application mode, you can try
setting the `flinkMaster` parameter to `[local]`, which should launch an
inline flink runner just for your pipeline. If you want to have a scaling
out cluster per-application, you can launch a repeatable flink cluster with
kubernetes on a per-application basis pretty easily.

-Daniel

On Wed, Apr 19, 2023 at 8:11 AM Shahar Frank <sr...@gmail.com> wrote:

> Hi Daniel,
>
> I think you missed my last email that deals exactly with what you just
> commented.
>
> I can send it again if you can't find it
>
> Shahar.
>
>
> On Wed, Apr 19, 2023, 13:07 Daniel Collins <dp...@google.com> wrote:
>
>> > The real question I feel is why should there not be an option with Beam
>> to use a recommended best practice (by another apache project in fact) when
>> connecting to an external system?
>>
>> You ignored my previous answer. This is not a "best practice" for
>> streaming frameworks, only for applications, and is not used by other
>> frameworks *including kafka streams*.
>>
>> > The same as when connecting to SQS and PubSub should also be
>> implemented with Kafka I think.
>>
>> This makes it much harder to implement a consistent watermark and
>> involves either expensive secondary processing (Pub/Sub has a whole second
>> tracking subscription) or incorrectness of the watermark bounds when
>> reading from backlog. This makes windowing more likely to be incorrect.
>>
>> > the user should be allowed the option of using a mechanism that is part
>> of the system being connected if willing to accept the implication it has.
>>
>> This requires a complete rewrite of how KafkaIO operates, so its not as
>> easy as "flip a switch".
>>
>> >  And then the real question behind that would be - is there anything
>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>
>> You haven't answered my original question: Why do you care if it uses
>> consumer groups or not?
>>
>> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <sr...@gmail.com> wrote:
>>
>>> Thanks Kenn,
>>>
>>> I agree with your comments fully.
>>> I would rather use a Flink cluster even with the simple use case we have
>>> now.
>>>
>>> Sadly sometimes we have other constraints - especially in larger
>>> corporations like the one I work for - which make it harder to create and
>>> maintain these without real reasons.
>>> My intention is to introduce Apache Beam right now to allow for future
>>> growth opportunities such as you described but to do that I have to
>>> convince others that it is the best thing to do right now.
>>>
>>> It so happens that for the use case we are facing now perhaps Apache
>>> Beam is too much and it looks like Kafka Streams would allow us to avoid
>>> having to maintain another infrastructure cluster such as Flink.
>>>
>>> I would prefer to be able to propose a way we can use Beam right now
>>> without the added overhead of a Flink/Spark cluster so that I can convince
>>> the teams that it is a viable option right now.
>>> The alternative of switching to Beam once more requirements arrive would
>>> be much less preferable as this is likely to never gather enough momentum
>>> for a full refactoring.
>>>
>>> Finally I feel the question that really needs to be asked is not why
>>> Beam should recommend using a distributed processing framework which
>>> totally makes sense and not even why it may require that in some use cases.
>>>
>>> The real question I feel is why should there not be an option with Beam
>>> to use a recommended best practice (by another apache project in fact) when
>>> connecting to an external system?
>>> The same as when connecting to SQS and PubSub should also be implemented
>>> with Kafka I think.
>>> If we think the existing connector has advantages in some use cases then
>>> by all means it should still exist as an option however I feel the user
>>> should be allowed the option of using a mechanism that is part of the
>>> system being connected if willing to accept the implication it has.
>>>
>>> And then the real question behind that would be - is there anything
>>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>>
>>> Cheers,
>>> Shahar.
>>>
>>> ------------------------------
>>>
>>> Shahar Frank
>>>
>>> srfrnk@gmail.com
>>>
>>> +447799561438 <+44%207799%20561438>
>>>
>>> ------------------------------
>>>
>>>
>>>
>>>
>>>
>>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> Interesting discussion! I don't have a lot of expertise in some of
>>>> these details but I wanted to just add one little comment.
>>>>
>>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com> wrote:
>>>>
>>>>> Regarding horizontal scaling I may have not been clear about what I
>>>>> mean.
>>>>> With Kafka Streams I can just run the same app (e.g. with a K8s
>>>>> Deployment) on as many pods I need and with Kafka Consumer Groups managing
>>>>> the distribution of data all would work well.
>>>>> Moreover I can create more pods than the number of partitions and keep
>>>>> them idle so that if/when some crash others pick the slack quicker. And
>>>>> this would all be managed by the Kafka consumer group coordinator.
>>>>> If I do the same with an Apache Beam application - for example with a
>>>>> Direct Runner or a Flink Runner running in "local" mode - each instance
>>>>> will consume the entire topic as it is unaware of the other instances. To
>>>>> work I will be required to use something like a Flink runner with a full
>>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
>>>>> where maintaining such an additional cluster is not required for the actual
>>>>> functionality (e.g. just for filtering) and incurs costs not everyone is
>>>>> willing to make.
>>>>>
>>>>
>>>> Horizontal scaling is built into Beam. It all occurs within a single
>>>> pipeline. You should not try to scale up by running multiple pipelines
>>>> consuming from the same consumer group. Running your Beam pipeline on a
>>>> Flink cluster (or any other distributed runner) is the intended way to
>>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
>>>> most" sophisticated) sharding and work balancing model, for distributing
>>>> shards of work across workers. So it is by design that Beam is aware of the
>>>> sharding, but also does its own thing above and beyond. It is true that if
>>>> you have a very simple use case then Beam could be more than you need. Beam
>>>> is very general purpose. Of course, simple uses grow into more complex uses
>>>> :-) and you might end up stuck and/or porting to a distributed processing
>>>> engine after all. It all depends on what you are trying to do and what you
>>>> might do in the future!
>>>>
>>>> Kenn
>>>>
>>>>
>>>>
>>>>>
>>>>> Kafka Streams would not run on a Kafka cluster that is true - I was
>>>>> not saying it would. It might be run on a K8s cluster which is also how I
>>>>> might be running K8s itself and any Flink/Spark cluster I might need. While
>>>>> Flink can run in a standalone as well as cluster mode... as I said earlier
>>>>> in this use case when Flink is used in standalone mode the use case fails -
>>>>> which is my issue, as the same will work with Kafka Streams.
>>>>>
>>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer
>>>>> to use Beam but to do that I need to show that we have no disadvantages for
>>>>> the initial use case.
>>>>> I'm also very aware of the other benefits of Beam - being runner
>>>>> agnostic, language agnostic, source and sink agostic etc. which is why I
>>>>> would very much like to use it right now.
>>>>> Sadly right now if we are unable to scale horizontally without
>>>>> maintaining another cluster - i.e. Flink or Spark - this is a major
>>>>> disadvantage which might drive us to use Kafka Streams instead.
>>>>>
>>>>> I have opened a bug in Github
>>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can
>>>>> edit that to be more specific to documentation however I feel this would be
>>>>> a huge miss for the project.
>>>>> Also happy to open the same in JIRA if you can point me to where in
>>>>> JIRA I can do that.
>>>>> I couldn't find that on the contact-us
>>>>> <https://beam.apache.org/community/contact-us/> page - it actually
>>>>> points you to create an issue in Github which is what I did.
>>>>>
>>>>> Finally I've been looking into other stream consumers in Beam IO.
>>>>>
>>>>> I might be missing something but I think SqsIO seems to not be
>>>>> managing the same - but rather expects SQS to manage data distribution
>>>>> between consumers:
>>>>>
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>>>>>
>>>>> I think same goes for PubsubIO which I can't see where it tries to set
>>>>> any control over which part of the stream would be managed by which
>>>>> consumer but lets the Pubsub service do that instead:
>>>>>
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>>>>>
>>>>> I would ask why should Kafka not be considered a "streaming service"
>>>>> just like SQS and Pubsub and allow it to manage data distribution just like
>>>>> they would?
>>>>>
>>>>> Cheers,
>>>>> Shahar.
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> Shahar Frank
>>>>>
>>>>> srfrnk@gmail.com
>>>>>
>>>>> +447799561438 <+44%207799%20561438>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com>
>>>>> wrote:
>>>>>
>>>>>> > the best practice recommended to consume for Kafka
>>>>>>
>>>>>> This is when using the consumer API directly- when using a framework
>>>>>> like beam or even kafka streams (which actually doesn't use the consumer
>>>>>> group assignment mechanism, see here
>>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>>>>>> you should rely on the framework's partition to task assignment mechanism.
>>>>>>
>>>>>> > all observability tools around Kafka use consumer group lag which
>>>>>> is going to be unavailable if consumer groups are not being used
>>>>>>
>>>>>> This is a valid feature request, to reflect the underlying progress
>>>>>> in offset commits, filing a feature request on JIRA would be the best way
>>>>>> to get this prioritized.
>>>>>>
>>>>>> > Kafka Streams allows me to horizontally scale by adding as many
>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>> distribution by using consumer groups.
>>>>>>
>>>>>> No it doesn't, see above. Beam allows you to scale out horizontally
>>>>>> as well, but neither allows you to scale the source horizontally beyond the
>>>>>> number of partitions on the topic.
>>>>>>
>>>>>> > I'm required to maintain another distributed processing cluster
>>>>>> like Spark or Flink (on top of a Kafka cluster I already have)
>>>>>>
>>>>>> Kafka streams does not run on your kafka cluster. It is a separate
>>>>>> runtime that you need to turn up and run jobs for separately, same as
>>>>>> flink. The only difference is that, effectively, you can only run kafka
>>>>>> streams in application mode while flink can also be run in session/cluster
>>>>>> mode.
>>>>>>
>>>>>> The main downside of Kafka streams is that it can only be used
>>>>>> talking with kafka. If you ever want to read batch data or from/to another
>>>>>> streaming system you cannot reuse your existing code/architecture and need
>>>>>> to rewrite everything. One advantage of decoupled frameworks like beam (or
>>>>>> flink or spark) is that the same pipeline and code can be reused for
>>>>>> various data sources and sinks, and they come with a library of prebuilt
>>>>>> ones.
>>>>>>
>>>>>> > documentation
>>>>>>
>>>>>> If the documentation misleadingly suggests that you can set group.id,
>>>>>> and doing so does not make upstream offset commits to allow you to access
>>>>>> metrics, please file a bug on JIRA so either the documentation or
>>>>>> implementation can be corrected.
>>>>>>
>>>>>> -Daniel
>>>>>>
>>>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the information Daniel,
>>>>>>>
>>>>>>> For one - Kafka Consumer Groups is the best practice recommended to
>>>>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
>>>>>>> Also all observability tools around Kafka use consumer group lag
>>>>>>> which is going to be unavailable if consumer groups are not being used.
>>>>>>>
>>>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
>>>>>>> Kafka Streams.
>>>>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>>> distribution by using consumer groups.
>>>>>>> With Apache Beam I'm required to maintain another distributed
>>>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I already
>>>>>>> have) to be able to do the same.
>>>>>>> To be clear in this use case there is no need for an additional
>>>>>>> cluster except for consumer groups not being used.
>>>>>>> This constitutes a disadvantage over Kafka Streams and other
>>>>>>> solutions that use consumer groups.
>>>>>>>
>>>>>>> Furthermore if this use case is not supported I would imagine the
>>>>>>> documentation would mention that or at least not imply to the contrary.
>>>>>>> In the latest version of the documentation for KafkaIO
>>>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>>>>>>> shows an example for configuring to use a consumer group while in fact this
>>>>>>> settings will not be doing anything of the sort:
>>>>>>> [image: image.png]
>>>>>>> And:
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> It seems like this has already been raised in the past - e.g. here
>>>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>>>>>>> so I'm probably not the first person to be confused about that.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Shahar.
>>>>>>>
>>>>>>> ------------------------------
>>>>>>>
>>>>>>> Shahar Frank
>>>>>>>
>>>>>>> srfrnk@gmail.com
>>>>>>>
>>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> Why do you want to use a consumer group? If you have consumers in
>>>>>>>> other jobs, your beam job will fail to receive all messages it should for
>>>>>>>> the topic.
>>>>>>>>
>>>>>>>> > It seems the code attempts to circumvent the partition assignment
>>>>>>>> mechanism provided by Kafka to use it's own.
>>>>>>>>
>>>>>>>> All beam I/Os for partitioned sources do this. They use access to
>>>>>>>> the partitioning structure of the underlying system to track their progress
>>>>>>>> through each partition and provide feedback for scaling, as well as
>>>>>>>> tracking and enforcing exactly once processing semantics. In fact, most
>>>>>>>> interops with streaming data processing systems do this, you can see the
>>>>>>>> documentation of the flink kafka interop (
>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>>>>>>> that it does not use or respect the partition assignments.
>>>>>>>>
>>>>>>>> > By doing that it prevents the user from using consumer groups.
>>>>>>>>
>>>>>>>> Again, why do you (as a user) want to use consumer groups? What
>>>>>>>> value does it provide you?
>>>>>>>>
>>>>>>>> -Daniel
>>>>>>>>
>>>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> Posting here as suggested here
>>>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>>>>>>> I've added "group.id" to the consumer properties.
>>>>>>>>> When running the pipeline I can see this value sent to Kafka in
>>>>>>>>> the consumer properties.
>>>>>>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>>>>>>> Looking into the code I can see that nowhere is the consumer
>>>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>>>>>>> consumer group. It seems the code attempts to circumvent the partition
>>>>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>>>>> By doing that it prevents the user from using consumer groups.
>>>>>>>>> Is that by intention? Is there any reason why the decision to
>>>>>>>>> avoid using consumer groups has been taken?
>>>>>>>>> I would love to see any documentation about that if possible
>>>>>>>>> please.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Shahar.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Shahar Frank <sr...@gmail.com>.
Hi Daniel,

I think you missed my last email that deals exactly with what you just
commented.

I can send it again if you can't find it

Shahar.


On Wed, Apr 19, 2023, 13:07 Daniel Collins <dp...@google.com> wrote:

> > The real question I feel is why should there not be an option with Beam
> to use a recommended best practice (by another apache project in fact) when
> connecting to an external system?
>
> You ignored my previous answer. This is not a "best practice" for
> streaming frameworks, only for applications, and is not used by other
> frameworks *including kafka streams*.
>
> > The same as when connecting to SQS and PubSub should also be implemented
> with Kafka I think.
>
> This makes it much harder to implement a consistent watermark and involves
> either expensive secondary processing (Pub/Sub has a whole second tracking
> subscription) or incorrectness of the watermark bounds when reading from
> backlog. This makes windowing more likely to be incorrect.
>
> > the user should be allowed the option of using a mechanism that is part
> of the system being connected if willing to accept the implication it has.
>
> This requires a complete rewrite of how KafkaIO operates, so its not as
> easy as "flip a switch".
>
> >  And then the real question behind that would be - is there anything
> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>
> You haven't answered my original question: Why do you care if it uses
> consumer groups or not?
>
> On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <sr...@gmail.com> wrote:
>
>> Thanks Kenn,
>>
>> I agree with your comments fully.
>> I would rather use a Flink cluster even with the simple use case we have
>> now.
>>
>> Sadly sometimes we have other constraints - especially in larger
>> corporations like the one I work for - which make it harder to create and
>> maintain these without real reasons.
>> My intention is to introduce Apache Beam right now to allow for future
>> growth opportunities such as you described but to do that I have to
>> convince others that it is the best thing to do right now.
>>
>> It so happens that for the use case we are facing now perhaps Apache Beam
>> is too much and it looks like Kafka Streams would allow us to avoid having
>> to maintain another infrastructure cluster such as Flink.
>>
>> I would prefer to be able to propose a way we can use Beam right now
>> without the added overhead of a Flink/Spark cluster so that I can convince
>> the teams that it is a viable option right now.
>> The alternative of switching to Beam once more requirements arrive would
>> be much less preferable as this is likely to never gather enough momentum
>> for a full refactoring.
>>
>> Finally I feel the question that really needs to be asked is not why Beam
>> should recommend using a distributed processing framework which
>> totally makes sense and not even why it may require that in some use cases.
>>
>> The real question I feel is why should there not be an option with Beam
>> to use a recommended best practice (by another apache project in fact) when
>> connecting to an external system?
>> The same as when connecting to SQS and PubSub should also be implemented
>> with Kafka I think.
>> If we think the existing connector has advantages in some use cases then
>> by all means it should still exist as an option however I feel the user
>> should be allowed the option of using a mechanism that is part of the
>> system being connected if willing to accept the implication it has.
>>
>> And then the real question behind that would be - is there anything
>> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>>
>> Cheers,
>> Shahar.
>>
>> ------------------------------
>>
>> Shahar Frank
>>
>> srfrnk@gmail.com
>>
>> +447799561438 <+44%207799%20561438>
>>
>> ------------------------------
>>
>>
>>
>>
>>
>> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Interesting discussion! I don't have a lot of expertise in some of these
>>> details but I wanted to just add one little comment.
>>>
>>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com> wrote:
>>>
>>>> Regarding horizontal scaling I may have not been clear about what I
>>>> mean.
>>>> With Kafka Streams I can just run the same app (e.g. with a K8s
>>>> Deployment) on as many pods I need and with Kafka Consumer Groups managing
>>>> the distribution of data all would work well.
>>>> Moreover I can create more pods than the number of partitions and keep
>>>> them idle so that if/when some crash others pick the slack quicker. And
>>>> this would all be managed by the Kafka consumer group coordinator.
>>>> If I do the same with an Apache Beam application - for example with a
>>>> Direct Runner or a Flink Runner running in "local" mode - each instance
>>>> will consume the entire topic as it is unaware of the other instances. To
>>>> work I will be required to use something like a Flink runner with a full
>>>> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
>>>> where maintaining such an additional cluster is not required for the actual
>>>> functionality (e.g. just for filtering) and incurs costs not everyone is
>>>> willing to make.
>>>>
>>>
>>> Horizontal scaling is built into Beam. It all occurs within a single
>>> pipeline. You should not try to scale up by running multiple pipelines
>>> consuming from the same consumer group. Running your Beam pipeline on a
>>> Flink cluster (or any other distributed runner) is the intended way to
>>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
>>> most" sophisticated) sharding and work balancing model, for distributing
>>> shards of work across workers. So it is by design that Beam is aware of the
>>> sharding, but also does its own thing above and beyond. It is true that if
>>> you have a very simple use case then Beam could be more than you need. Beam
>>> is very general purpose. Of course, simple uses grow into more complex uses
>>> :-) and you might end up stuck and/or porting to a distributed processing
>>> engine after all. It all depends on what you are trying to do and what you
>>> might do in the future!
>>>
>>> Kenn
>>>
>>>
>>>
>>>>
>>>> Kafka Streams would not run on a Kafka cluster that is true - I was not
>>>> saying it would. It might be run on a K8s cluster which is also how I might
>>>> be running K8s itself and any Flink/Spark cluster I might need. While Flink
>>>> can run in a standalone as well as cluster mode... as I said earlier in
>>>> this use case when Flink is used in standalone mode the use case fails -
>>>> which is my issue, as the same will work with Kafka Streams.
>>>>
>>>> Kafka Streams can only connect to Kafka - exactly why I would prefer to
>>>> use Beam but to do that I need to show that we have no disadvantages for
>>>> the initial use case.
>>>> I'm also very aware of the other benefits of Beam - being runner
>>>> agnostic, language agnostic, source and sink agostic etc. which is why I
>>>> would very much like to use it right now.
>>>> Sadly right now if we are unable to scale horizontally without
>>>> maintaining another cluster - i.e. Flink or Spark - this is a major
>>>> disadvantage which might drive us to use Kafka Streams instead.
>>>>
>>>> I have opened a bug in Github
>>>> <https://github.com/apache/beam/issues/25978>for the issue. I can edit
>>>> that to be more specific to documentation however I feel this would be a
>>>> huge miss for the project.
>>>> Also happy to open the same in JIRA if you can point me to where in
>>>> JIRA I can do that.
>>>> I couldn't find that on the contact-us
>>>> <https://beam.apache.org/community/contact-us/> page - it actually
>>>> points you to create an issue in Github which is what I did.
>>>>
>>>> Finally I've been looking into other stream consumers in Beam IO.
>>>>
>>>> I might be missing something but I think SqsIO seems to not be managing
>>>> the same - but rather expects SQS to manage data distribution between
>>>> consumers:
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>>>>
>>>> I think same goes for PubsubIO which I can't see where it tries to set
>>>> any control over which part of the stream would be managed by which
>>>> consumer but lets the Pubsub service do that instead:
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>>>>
>>>> I would ask why should Kafka not be considered a "streaming service"
>>>> just like SQS and Pubsub and allow it to manage data distribution just like
>>>> they would?
>>>>
>>>> Cheers,
>>>> Shahar.
>>>>
>>>> ------------------------------
>>>>
>>>> Shahar Frank
>>>>
>>>> srfrnk@gmail.com
>>>>
>>>> +447799561438 <+44%207799%20561438>
>>>>
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> > the best practice recommended to consume for Kafka
>>>>>
>>>>> This is when using the consumer API directly- when using a framework
>>>>> like beam or even kafka streams (which actually doesn't use the consumer
>>>>> group assignment mechanism, see here
>>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>>>>> you should rely on the framework's partition to task assignment mechanism.
>>>>>
>>>>> > all observability tools around Kafka use consumer group lag which is
>>>>> going to be unavailable if consumer groups are not being used
>>>>>
>>>>> This is a valid feature request, to reflect the underlying progress in
>>>>> offset commits, filing a feature request on JIRA would be the best way to
>>>>> get this prioritized.
>>>>>
>>>>> > Kafka Streams allows me to horizontally scale by adding as many
>>>>> instances of my application as I need and relies on Kafka to manage
>>>>> distribution by using consumer groups.
>>>>>
>>>>> No it doesn't, see above. Beam allows you to scale out horizontally as
>>>>> well, but neither allows you to scale the source horizontally beyond the
>>>>> number of partitions on the topic.
>>>>>
>>>>> > I'm required to maintain another distributed processing cluster like
>>>>> Spark or Flink (on top of a Kafka cluster I already have)
>>>>>
>>>>> Kafka streams does not run on your kafka cluster. It is a separate
>>>>> runtime that you need to turn up and run jobs for separately, same as
>>>>> flink. The only difference is that, effectively, you can only run kafka
>>>>> streams in application mode while flink can also be run in session/cluster
>>>>> mode.
>>>>>
>>>>> The main downside of Kafka streams is that it can only be used talking
>>>>> with kafka. If you ever want to read batch data or from/to another
>>>>> streaming system you cannot reuse your existing code/architecture and need
>>>>> to rewrite everything. One advantage of decoupled frameworks like beam (or
>>>>> flink or spark) is that the same pipeline and code can be reused for
>>>>> various data sources and sinks, and they come with a library of prebuilt
>>>>> ones.
>>>>>
>>>>> > documentation
>>>>>
>>>>> If the documentation misleadingly suggests that you can set group.id,
>>>>> and doing so does not make upstream offset commits to allow you to access
>>>>> metrics, please file a bug on JIRA so either the documentation or
>>>>> implementation can be corrected.
>>>>>
>>>>> -Daniel
>>>>>
>>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the information Daniel,
>>>>>>
>>>>>> For one - Kafka Consumer Groups is the best practice recommended to
>>>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
>>>>>> Also all observability tools around Kafka use consumer group lag
>>>>>> which is going to be unavailable if consumer groups are not being used.
>>>>>>
>>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
>>>>>> Kafka Streams.
>>>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>>>> instances of my application as I need and relies on Kafka to manage
>>>>>> distribution by using consumer groups.
>>>>>> With Apache Beam I'm required to maintain another distributed
>>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I already
>>>>>> have) to be able to do the same.
>>>>>> To be clear in this use case there is no need for an additional
>>>>>> cluster except for consumer groups not being used.
>>>>>> This constitutes a disadvantage over Kafka Streams and other
>>>>>> solutions that use consumer groups.
>>>>>>
>>>>>> Furthermore if this use case is not supported I would imagine the
>>>>>> documentation would mention that or at least not imply to the contrary.
>>>>>> In the latest version of the documentation for KafkaIO
>>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>>>>>> shows an example for configuring to use a consumer group while in fact this
>>>>>> settings will not be doing anything of the sort:
>>>>>> [image: image.png]
>>>>>> And:
>>>>>> [image: image.png]
>>>>>>
>>>>>> It seems like this has already been raised in the past - e.g. here
>>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>>>>>> so I'm probably not the first person to be confused about that.
>>>>>>
>>>>>> Cheers,
>>>>>> Shahar.
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> Shahar Frank
>>>>>>
>>>>>> srfrnk@gmail.com
>>>>>>
>>>>>> +447799561438 <+44%207799%20561438>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> Why do you want to use a consumer group? If you have consumers in
>>>>>>> other jobs, your beam job will fail to receive all messages it should for
>>>>>>> the topic.
>>>>>>>
>>>>>>> > It seems the code attempts to circumvent the partition assignment
>>>>>>> mechanism provided by Kafka to use it's own.
>>>>>>>
>>>>>>> All beam I/Os for partitioned sources do this. They use access to
>>>>>>> the partitioning structure of the underlying system to track their progress
>>>>>>> through each partition and provide feedback for scaling, as well as
>>>>>>> tracking and enforcing exactly once processing semantics. In fact, most
>>>>>>> interops with streaming data processing systems do this, you can see the
>>>>>>> documentation of the flink kafka interop (
>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>>>>>> that it does not use or respect the partition assignments.
>>>>>>>
>>>>>>> > By doing that it prevents the user from using consumer groups.
>>>>>>>
>>>>>>> Again, why do you (as a user) want to use consumer groups? What
>>>>>>> value does it provide you?
>>>>>>>
>>>>>>> -Daniel
>>>>>>>
>>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Posting here as suggested here
>>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
>>>>>>>> .
>>>>>>>>
>>>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>>>>>> I've added "group.id" to the consumer properties.
>>>>>>>> When running the pipeline I can see this value sent to Kafka in the
>>>>>>>> consumer properties.
>>>>>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>>>>>> Looking into the code I can see that nowhere is the consumer
>>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>>>>>> consumer group. It seems the code attempts to circumvent the partition
>>>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>>>> By doing that it prevents the user from using consumer groups.
>>>>>>>> Is that by intention? Is there any reason why the decision to avoid
>>>>>>>> using consumer groups has been taken?
>>>>>>>> I would love to see any documentation about that if possible please.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Shahar.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Daniel Collins via dev <de...@beam.apache.org>.
> The real question I feel is why should there not be an option with Beam
to use a recommended best practice (by another apache project in fact) when
connecting to an external system?

You ignored my previous answer. This is not a "best practice" for streaming
frameworks, only for applications, and is not used by other frameworks
*including kafka streams*.

> The same as when connecting to SQS and PubSub should also be implemented
with Kafka I think.

This makes it much harder to implement a consistent watermark and involves
either expensive secondary processing (Pub/Sub has a whole second tracking
subscription) or incorrectness of the watermark bounds when reading from
backlog. This makes windowing more likely to be incorrect.

> the user should be allowed the option of using a mechanism that is part
of the system being connected if willing to accept the implication it has.

This requires a complete rewrite of how KafkaIO operates, so its not as
easy as "flip a switch".

>  And then the real question behind that would be - is there anything
preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?

You haven't answered my original question: Why do you care if it uses
consumer groups or not?

On Wed, Apr 19, 2023 at 3:46 AM Shahar Frank <sr...@gmail.com> wrote:

> Thanks Kenn,
>
> I agree with your comments fully.
> I would rather use a Flink cluster even with the simple use case we have
> now.
>
> Sadly sometimes we have other constraints - especially in larger
> corporations like the one I work for - which make it harder to create and
> maintain these without real reasons.
> My intention is to introduce Apache Beam right now to allow for future
> growth opportunities such as you described but to do that I have to
> convince others that it is the best thing to do right now.
>
> It so happens that for the use case we are facing now perhaps Apache Beam
> is too much and it looks like Kafka Streams would allow us to avoid having
> to maintain another infrastructure cluster such as Flink.
>
> I would prefer to be able to propose a way we can use Beam right now
> without the added overhead of a Flink/Spark cluster so that I can convince
> the teams that it is a viable option right now.
> The alternative of switching to Beam once more requirements arrive would
> be much less preferable as this is likely to never gather enough momentum
> for a full refactoring.
>
> Finally I feel the question that really needs to be asked is not why Beam
> should recommend using a distributed processing framework which
> totally makes sense and not even why it may require that in some use cases.
>
> The real question I feel is why should there not be an option with Beam to
> use a recommended best practice (by another apache project in fact) when
> connecting to an external system?
> The same as when connecting to SQS and PubSub should also be implemented
> with Kafka I think.
> If we think the existing connector has advantages in some use cases then
> by all means it should still exist as an option however I feel the user
> should be allowed the option of using a mechanism that is part of the
> system being connected if willing to accept the implication it has.
>
> And then the real question behind that would be - is there anything
> preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?
>
> Cheers,
> Shahar.
>
> ------------------------------
>
> Shahar Frank
>
> srfrnk@gmail.com
>
> +447799561438 <+44%207799%20561438>
>
> ------------------------------
>
>
>
>
>
> On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <ke...@apache.org> wrote:
>
>> Interesting discussion! I don't have a lot of expertise in some of these
>> details but I wanted to just add one little comment.
>>
>> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com> wrote:
>>
>>> Regarding horizontal scaling I may have not been clear about what I mean.
>>> With Kafka Streams I can just run the same app (e.g. with a K8s
>>> Deployment) on as many pods I need and with Kafka Consumer Groups managing
>>> the distribution of data all would work well.
>>> Moreover I can create more pods than the number of partitions and keep
>>> them idle so that if/when some crash others pick the slack quicker. And
>>> this would all be managed by the Kafka consumer group coordinator.
>>> If I do the same with an Apache Beam application - for example with a
>>> Direct Runner or a Flink Runner running in "local" mode - each instance
>>> will consume the entire topic as it is unaware of the other instances. To
>>> work I will be required to use something like a Flink runner with a full
>>> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
>>> where maintaining such an additional cluster is not required for the actual
>>> functionality (e.g. just for filtering) and incurs costs not everyone is
>>> willing to make.
>>>
>>
>> Horizontal scaling is built into Beam. It all occurs within a single
>> pipeline. You should not try to scale up by running multiple pipelines
>> consuming from the same consumer group. Running your Beam pipeline on a
>> Flink cluster (or any other distributed runner) is the intended way to
>> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
>> most" sophisticated) sharding and work balancing model, for distributing
>> shards of work across workers. So it is by design that Beam is aware of the
>> sharding, but also does its own thing above and beyond. It is true that if
>> you have a very simple use case then Beam could be more than you need. Beam
>> is very general purpose. Of course, simple uses grow into more complex uses
>> :-) and you might end up stuck and/or porting to a distributed processing
>> engine after all. It all depends on what you are trying to do and what you
>> might do in the future!
>>
>> Kenn
>>
>>
>>
>>>
>>> Kafka Streams would not run on a Kafka cluster that is true - I was not
>>> saying it would. It might be run on a K8s cluster which is also how I might
>>> be running K8s itself and any Flink/Spark cluster I might need. While Flink
>>> can run in a standalone as well as cluster mode... as I said earlier in
>>> this use case when Flink is used in standalone mode the use case fails -
>>> which is my issue, as the same will work with Kafka Streams.
>>>
>>> Kafka Streams can only connect to Kafka - exactly why I would prefer to
>>> use Beam but to do that I need to show that we have no disadvantages for
>>> the initial use case.
>>> I'm also very aware of the other benefits of Beam - being runner
>>> agnostic, language agnostic, source and sink agostic etc. which is why I
>>> would very much like to use it right now.
>>> Sadly right now if we are unable to scale horizontally without
>>> maintaining another cluster - i.e. Flink or Spark - this is a major
>>> disadvantage which might drive us to use Kafka Streams instead.
>>>
>>> I have opened a bug in Github
>>> <https://github.com/apache/beam/issues/25978>for the issue. I can edit
>>> that to be more specific to documentation however I feel this would be a
>>> huge miss for the project.
>>> Also happy to open the same in JIRA if you can point me to where in JIRA
>>> I can do that.
>>> I couldn't find that on the contact-us
>>> <https://beam.apache.org/community/contact-us/> page - it actually
>>> points you to create an issue in Github which is what I did.
>>>
>>> Finally I've been looking into other stream consumers in Beam IO.
>>>
>>> I might be missing something but I think SqsIO seems to not be managing
>>> the same - but rather expects SQS to manage data distribution between
>>> consumers:
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>>>
>>> I think same goes for PubsubIO which I can't see where it tries to set
>>> any control over which part of the stream would be managed by which
>>> consumer but lets the Pubsub service do that instead:
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>>>
>>> I would ask why should Kafka not be considered a "streaming service"
>>> just like SQS and Pubsub and allow it to manage data distribution just like
>>> they would?
>>>
>>> Cheers,
>>> Shahar.
>>>
>>> ------------------------------
>>>
>>> Shahar Frank
>>>
>>> srfrnk@gmail.com
>>>
>>> +447799561438 <+44%207799%20561438>
>>>
>>> ------------------------------
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> > the best practice recommended to consume for Kafka
>>>>
>>>> This is when using the consumer API directly- when using a framework
>>>> like beam or even kafka streams (which actually doesn't use the consumer
>>>> group assignment mechanism, see here
>>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>>>> you should rely on the framework's partition to task assignment mechanism.
>>>>
>>>> > all observability tools around Kafka use consumer group lag which is
>>>> going to be unavailable if consumer groups are not being used
>>>>
>>>> This is a valid feature request, to reflect the underlying progress in
>>>> offset commits, filing a feature request on JIRA would be the best way to
>>>> get this prioritized.
>>>>
>>>> > Kafka Streams allows me to horizontally scale by adding as many
>>>> instances of my application as I need and relies on Kafka to manage
>>>> distribution by using consumer groups.
>>>>
>>>> No it doesn't, see above. Beam allows you to scale out horizontally as
>>>> well, but neither allows you to scale the source horizontally beyond the
>>>> number of partitions on the topic.
>>>>
>>>> > I'm required to maintain another distributed processing cluster like
>>>> Spark or Flink (on top of a Kafka cluster I already have)
>>>>
>>>> Kafka streams does not run on your kafka cluster. It is a separate
>>>> runtime that you need to turn up and run jobs for separately, same as
>>>> flink. The only difference is that, effectively, you can only run kafka
>>>> streams in application mode while flink can also be run in session/cluster
>>>> mode.
>>>>
>>>> The main downside of Kafka streams is that it can only be used talking
>>>> with kafka. If you ever want to read batch data or from/to another
>>>> streaming system you cannot reuse your existing code/architecture and need
>>>> to rewrite everything. One advantage of decoupled frameworks like beam (or
>>>> flink or spark) is that the same pipeline and code can be reused for
>>>> various data sources and sinks, and they come with a library of prebuilt
>>>> ones.
>>>>
>>>> > documentation
>>>>
>>>> If the documentation misleadingly suggests that you can set group.id,
>>>> and doing so does not make upstream offset commits to allow you to access
>>>> metrics, please file a bug on JIRA so either the documentation or
>>>> implementation can be corrected.
>>>>
>>>> -Daniel
>>>>
>>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com> wrote:
>>>>
>>>>> Thanks for the information Daniel,
>>>>>
>>>>> For one - Kafka Consumer Groups is the best practice recommended to
>>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
>>>>> Also all observability tools around Kafka use consumer group lag which
>>>>> is going to be unavailable if consumer groups are not being used.
>>>>>
>>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
>>>>> Kafka Streams.
>>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>>> instances of my application as I need and relies on Kafka to manage
>>>>> distribution by using consumer groups.
>>>>> With Apache Beam I'm required to maintain another distributed
>>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I already
>>>>> have) to be able to do the same.
>>>>> To be clear in this use case there is no need for an additional
>>>>> cluster except for consumer groups not being used.
>>>>> This constitutes a disadvantage over Kafka Streams and other solutions
>>>>> that use consumer groups.
>>>>>
>>>>> Furthermore if this use case is not supported I would imagine the
>>>>> documentation would mention that or at least not imply to the contrary.
>>>>> In the latest version of the documentation for KafkaIO
>>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>>>>> shows an example for configuring to use a consumer group while in fact this
>>>>> settings will not be doing anything of the sort:
>>>>> [image: image.png]
>>>>> And:
>>>>> [image: image.png]
>>>>>
>>>>> It seems like this has already been raised in the past - e.g. here
>>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>>>>> so I'm probably not the first person to be confused about that.
>>>>>
>>>>> Cheers,
>>>>> Shahar.
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> Shahar Frank
>>>>>
>>>>> srfrnk@gmail.com
>>>>>
>>>>> +447799561438 <+44%207799%20561438>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> Why do you want to use a consumer group? If you have consumers in
>>>>>> other jobs, your beam job will fail to receive all messages it should for
>>>>>> the topic.
>>>>>>
>>>>>> > It seems the code attempts to circumvent the partition assignment
>>>>>> mechanism provided by Kafka to use it's own.
>>>>>>
>>>>>> All beam I/Os for partitioned sources do this. They use access to the
>>>>>> partitioning structure of the underlying system to track their progress
>>>>>> through each partition and provide feedback for scaling, as well as
>>>>>> tracking and enforcing exactly once processing semantics. In fact, most
>>>>>> interops with streaming data processing systems do this, you can see the
>>>>>> documentation of the flink kafka interop (
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>>>>> that it does not use or respect the partition assignments.
>>>>>>
>>>>>> > By doing that it prevents the user from using consumer groups.
>>>>>>
>>>>>> Again, why do you (as a user) want to use consumer groups? What value
>>>>>> does it provide you?
>>>>>>
>>>>>> -Daniel
>>>>>>
>>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> Posting here as suggested here
>>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
>>>>>>> .
>>>>>>>
>>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>>>>> I've added "group.id" to the consumer properties.
>>>>>>> When running the pipeline I can see this value sent to Kafka in the
>>>>>>> consumer properties.
>>>>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>>>>> Looking into the code I can see that nowhere is the consumer
>>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>>>>> consumer group. It seems the code attempts to circumvent the partition
>>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>>> By doing that it prevents the user from using consumer groups.
>>>>>>> Is that by intention? Is there any reason why the decision to avoid
>>>>>>> using consumer groups has been taken?
>>>>>>> I would love to see any documentation about that if possible please.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Shahar.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Shahar Frank <sr...@gmail.com>.
Thanks Kenn,

I agree with your comments fully.
I would rather use a Flink cluster even with the simple use case we have
now.

Sadly sometimes we have other constraints - especially in larger
corporations like the one I work for - which make it harder to create and
maintain these without real reasons.
My intention is to introduce Apache Beam right now to allow for future
growth opportunities such as you described but to do that I have to
convince others that it is the best thing to do right now.

It so happens that for the use case we are facing now perhaps Apache Beam
is too much and it looks like Kafka Streams would allow us to avoid having
to maintain another infrastructure cluster such as Flink.

I would prefer to be able to propose a way we can use Beam right now
without the added overhead of a Flink/Spark cluster so that I can convince
the teams that it is a viable option right now.
The alternative of switching to Beam once more requirements arrive would be
much less preferable as this is likely to never gather enough momentum for
a full refactoring.

Finally I feel the question that really needs to be asked is not why Beam
should recommend using a distributed processing framework which
totally makes sense and not even why it may require that in some use cases.

The real question I feel is why should there not be an option with Beam to
use a recommended best practice (by another apache project in fact) when
connecting to an external system?
The same as when connecting to SQS and PubSub should also be implemented
with Kafka I think.
If we think the existing connector has advantages in some use cases then by
all means it should still exist as an option however I feel the user should
be allowed the option of using a mechanism that is part of the system being
connected if willing to accept the implication it has.

And then the real question behind that would be - is there anything
preventing Beam from using Apache Kafka Consumer Groups in KafkaIO?

Cheers,
Shahar.

------------------------------

Shahar Frank

srfrnk@gmail.com

+447799561438

------------------------------





On Mon, 17 Apr 2023 at 19:15, Kenneth Knowles <ke...@apache.org> wrote:

> Interesting discussion! I don't have a lot of expertise in some of these
> details but I wanted to just add one little comment.
>
> On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com> wrote:
>
>> Regarding horizontal scaling I may have not been clear about what I mean.
>> With Kafka Streams I can just run the same app (e.g. with a K8s
>> Deployment) on as many pods I need and with Kafka Consumer Groups managing
>> the distribution of data all would work well.
>> Moreover I can create more pods than the number of partitions and keep
>> them idle so that if/when some crash others pick the slack quicker. And
>> this would all be managed by the Kafka consumer group coordinator.
>> If I do the same with an Apache Beam application - for example with a
>> Direct Runner or a Flink Runner running in "local" mode - each instance
>> will consume the entire topic as it is unaware of the other instances. To
>> work I will be required to use something like a Flink runner with a full
>> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
>> where maintaining such an additional cluster is not required for the actual
>> functionality (e.g. just for filtering) and incurs costs not everyone is
>> willing to make.
>>
>
> Horizontal scaling is built into Beam. It all occurs within a single
> pipeline. You should not try to scale up by running multiple pipelines
> consuming from the same consumer group. Running your Beam pipeline on a
> Flink cluster (or any other distributed runner) is the intended way to
> achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
> most" sophisticated) sharding and work balancing model, for distributing
> shards of work across workers. So it is by design that Beam is aware of the
> sharding, but also does its own thing above and beyond. It is true that if
> you have a very simple use case then Beam could be more than you need. Beam
> is very general purpose. Of course, simple uses grow into more complex uses
> :-) and you might end up stuck and/or porting to a distributed processing
> engine after all. It all depends on what you are trying to do and what you
> might do in the future!
>
> Kenn
>
>
>
>>
>> Kafka Streams would not run on a Kafka cluster that is true - I was not
>> saying it would. It might be run on a K8s cluster which is also how I might
>> be running K8s itself and any Flink/Spark cluster I might need. While Flink
>> can run in a standalone as well as cluster mode... as I said earlier in
>> this use case when Flink is used in standalone mode the use case fails -
>> which is my issue, as the same will work with Kafka Streams.
>>
>> Kafka Streams can only connect to Kafka - exactly why I would prefer to
>> use Beam but to do that I need to show that we have no disadvantages for
>> the initial use case.
>> I'm also very aware of the other benefits of Beam - being runner
>> agnostic, language agnostic, source and sink agostic etc. which is why I
>> would very much like to use it right now.
>> Sadly right now if we are unable to scale horizontally without
>> maintaining another cluster - i.e. Flink or Spark - this is a major
>> disadvantage which might drive us to use Kafka Streams instead.
>>
>> I have opened a bug in Github
>> <https://github.com/apache/beam/issues/25978>for the issue. I can edit
>> that to be more specific to documentation however I feel this would be a
>> huge miss for the project.
>> Also happy to open the same in JIRA if you can point me to where in JIRA
>> I can do that.
>> I couldn't find that on the contact-us
>> <https://beam.apache.org/community/contact-us/> page - it actually
>> points you to create an issue in Github which is what I did.
>>
>> Finally I've been looking into other stream consumers in Beam IO.
>>
>> I might be missing something but I think SqsIO seems to not be managing
>> the same - but rather expects SQS to manage data distribution between
>> consumers:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>>
>> I think same goes for PubsubIO which I can't see where it tries to set
>> any control over which part of the stream would be managed by which
>> consumer but lets the Pubsub service do that instead:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>>
>> I would ask why should Kafka not be considered a "streaming service" just
>> like SQS and Pubsub and allow it to manage data distribution just like they
>> would?
>>
>> Cheers,
>> Shahar.
>>
>> ------------------------------
>>
>> Shahar Frank
>>
>> srfrnk@gmail.com
>>
>> +447799561438 <+44%207799%20561438>
>>
>> ------------------------------
>>
>>
>>
>>
>>
>>
>> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> > the best practice recommended to consume for Kafka
>>>
>>> This is when using the consumer API directly- when using a framework
>>> like beam or even kafka streams (which actually doesn't use the consumer
>>> group assignment mechanism, see here
>>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>>> you should rely on the framework's partition to task assignment mechanism.
>>>
>>> > all observability tools around Kafka use consumer group lag which is
>>> going to be unavailable if consumer groups are not being used
>>>
>>> This is a valid feature request, to reflect the underlying progress in
>>> offset commits, filing a feature request on JIRA would be the best way to
>>> get this prioritized.
>>>
>>> > Kafka Streams allows me to horizontally scale by adding as many
>>> instances of my application as I need and relies on Kafka to manage
>>> distribution by using consumer groups.
>>>
>>> No it doesn't, see above. Beam allows you to scale out horizontally as
>>> well, but neither allows you to scale the source horizontally beyond the
>>> number of partitions on the topic.
>>>
>>> > I'm required to maintain another distributed processing cluster like
>>> Spark or Flink (on top of a Kafka cluster I already have)
>>>
>>> Kafka streams does not run on your kafka cluster. It is a separate
>>> runtime that you need to turn up and run jobs for separately, same as
>>> flink. The only difference is that, effectively, you can only run kafka
>>> streams in application mode while flink can also be run in session/cluster
>>> mode.
>>>
>>> The main downside of Kafka streams is that it can only be used talking
>>> with kafka. If you ever want to read batch data or from/to another
>>> streaming system you cannot reuse your existing code/architecture and need
>>> to rewrite everything. One advantage of decoupled frameworks like beam (or
>>> flink or spark) is that the same pipeline and code can be reused for
>>> various data sources and sinks, and they come with a library of prebuilt
>>> ones.
>>>
>>> > documentation
>>>
>>> If the documentation misleadingly suggests that you can set group.id,
>>> and doing so does not make upstream offset commits to allow you to access
>>> metrics, please file a bug on JIRA so either the documentation or
>>> implementation can be corrected.
>>>
>>> -Daniel
>>>
>>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com> wrote:
>>>
>>>> Thanks for the information Daniel,
>>>>
>>>> For one - Kafka Consumer Groups is the best practice recommended to
>>>> consume for Kafka AFAIK and I would prefer to be able to use that.
>>>> Also all observability tools around Kafka use consumer group lag which
>>>> is going to be unavailable if consumer groups are not being used.
>>>>
>>>> Finally in my use case I'm asked to evaluate using Apache Beam vs.
>>>> Kafka Streams.
>>>> Kafka Streams allows me to horizontally scale by adding as many
>>>> instances of my application as I need and relies on Kafka to manage
>>>> distribution by using consumer groups.
>>>> With Apache Beam I'm required to maintain another distributed
>>>> processing cluster like Spark or Flink (on top of a Kafka cluster I already
>>>> have) to be able to do the same.
>>>> To be clear in this use case there is no need for an additional cluster
>>>> except for consumer groups not being used.
>>>> This constitutes a disadvantage over Kafka Streams and other solutions
>>>> that use consumer groups.
>>>>
>>>> Furthermore if this use case is not supported I would imagine the
>>>> documentation would mention that or at least not imply to the contrary.
>>>> In the latest version of the documentation for KafkaIO
>>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>>>> shows an example for configuring to use a consumer group while in fact this
>>>> settings will not be doing anything of the sort:
>>>> [image: image.png]
>>>> And:
>>>> [image: image.png]
>>>>
>>>> It seems like this has already been raised in the past - e.g. here
>>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>>>> so I'm probably not the first person to be confused about that.
>>>>
>>>> Cheers,
>>>> Shahar.
>>>>
>>>> ------------------------------
>>>>
>>>> Shahar Frank
>>>>
>>>> srfrnk@gmail.com
>>>>
>>>> +447799561438 <+44%207799%20561438>
>>>>
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> Why do you want to use a consumer group? If you have consumers in
>>>>> other jobs, your beam job will fail to receive all messages it should for
>>>>> the topic.
>>>>>
>>>>> > It seems the code attempts to circumvent the partition assignment
>>>>> mechanism provided by Kafka to use it's own.
>>>>>
>>>>> All beam I/Os for partitioned sources do this. They use access to the
>>>>> partitioning structure of the underlying system to track their progress
>>>>> through each partition and provide feedback for scaling, as well as
>>>>> tracking and enforcing exactly once processing semantics. In fact, most
>>>>> interops with streaming data processing systems do this, you can see the
>>>>> documentation of the flink kafka interop (
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>>>> that it does not use or respect the partition assignments.
>>>>>
>>>>> > By doing that it prevents the user from using consumer groups.
>>>>>
>>>>> Again, why do you (as a user) want to use consumer groups? What value
>>>>> does it provide you?
>>>>>
>>>>> -Daniel
>>>>>
>>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> Posting here as suggested here
>>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>
>>>>>> .
>>>>>>
>>>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>>>> I've added "group.id" to the consumer properties.
>>>>>> When running the pipeline I can see this value sent to Kafka in the
>>>>>> consumer properties.
>>>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>>>> Looking into the code I can see that nowhere is the consumer
>>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>>>> consumer group. It seems the code attempts to circumvent the partition
>>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>>> By doing that it prevents the user from using consumer groups.
>>>>>> Is that by intention? Is there any reason why the decision to avoid
>>>>>> using consumer groups has been taken?
>>>>>> I would love to see any documentation about that if possible please.
>>>>>>
>>>>>> Cheers,
>>>>>> Shahar.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Kenneth Knowles <ke...@apache.org>.
Interesting discussion! I don't have a lot of expertise in some of these
details but I wanted to just add one little comment.

On Sat, Apr 15, 2023 at 10:40 PM Shahar Frank <sr...@gmail.com> wrote:

> Regarding horizontal scaling I may have not been clear about what I mean.
> With Kafka Streams I can just run the same app (e.g. with a K8s
> Deployment) on as many pods I need and with Kafka Consumer Groups managing
> the distribution of data all would work well.
> Moreover I can create more pods than the number of partitions and keep
> them idle so that if/when some crash others pick the slack quicker. And
> this would all be managed by the Kafka consumer group coordinator.
> If I do the same with an Apache Beam application - for example with a
> Direct Runner or a Flink Runner running in "local" mode - each instance
> will consume the entire topic as it is unaware of the other instances. To
> work I will be required to use something like a Flink runner with a full
> fledged Flink cluster. This is a disadvantage for beam in simpler use cases
> where maintaining such an additional cluster is not required for the actual
> functionality (e.g. just for filtering) and incurs costs not everyone is
> willing to make.
>

Horizontal scaling is built into Beam. It all occurs within a single
pipeline. You should not try to scale up by running multiple pipelines
consuming from the same consumer group. Running your Beam pipeline on a
Flink cluster (or any other distributed runner) is the intended way to
achieve horizontal scaling. Beam has a very sophisticated (perhaps "the
most" sophisticated) sharding and work balancing model, for distributing
shards of work across workers. So it is by design that Beam is aware of the
sharding, but also does its own thing above and beyond. It is true that if
you have a very simple use case then Beam could be more than you need. Beam
is very general purpose. Of course, simple uses grow into more complex uses
:-) and you might end up stuck and/or porting to a distributed processing
engine after all. It all depends on what you are trying to do and what you
might do in the future!

Kenn



>
> Kafka Streams would not run on a Kafka cluster that is true - I was not
> saying it would. It might be run on a K8s cluster which is also how I might
> be running K8s itself and any Flink/Spark cluster I might need. While Flink
> can run in a standalone as well as cluster mode... as I said earlier in
> this use case when Flink is used in standalone mode the use case fails -
> which is my issue, as the same will work with Kafka Streams.
>
> Kafka Streams can only connect to Kafka - exactly why I would prefer to
> use Beam but to do that I need to show that we have no disadvantages for
> the initial use case.
> I'm also very aware of the other benefits of Beam - being runner agnostic,
> language agnostic, source and sink agostic etc. which is why I would very
> much like to use it right now.
> Sadly right now if we are unable to scale horizontally without maintaining
> another cluster - i.e. Flink or Spark - this is a major disadvantage which
> might drive us to use Kafka Streams instead.
>
> I have opened a bug in Github
> <https://github.com/apache/beam/issues/25978>for the issue. I can edit
> that to be more specific to documentation however I feel this would be a
> huge miss for the project.
> Also happy to open the same in JIRA if you can point me to where in JIRA I
> can do that.
> I couldn't find that on the contact-us
> <https://beam.apache.org/community/contact-us/> page - it actually points
> you to create an issue in Github which is what I did.
>
> Finally I've been looking into other stream consumers in Beam IO.
>
> I might be missing something but I think SqsIO seems to not be managing
> the same - but rather expects SQS to manage data distribution between
> consumers:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660
>
> I think same goes for PubsubIO which I can't see where it tries to set any
> control over which part of the stream would be managed by which consumer
> but lets the Pubsub service do that instead:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726
>
> I would ask why should Kafka not be considered a "streaming service" just
> like SQS and Pubsub and allow it to manage data distribution just like they
> would?
>
> Cheers,
> Shahar.
>
> ------------------------------
>
> Shahar Frank
>
> srfrnk@gmail.com
>
> +447799561438 <+44%207799%20561438>
>
> ------------------------------
>
>
>
>
>
>
> On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com> wrote:
>
>> > the best practice recommended to consume for Kafka
>>
>> This is when using the consumer API directly- when using a framework like
>> beam or even kafka streams (which actually doesn't use the consumer group
>> assignment mechanism, see here
>> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
>> you should rely on the framework's partition to task assignment mechanism.
>>
>> > all observability tools around Kafka use consumer group lag which is
>> going to be unavailable if consumer groups are not being used
>>
>> This is a valid feature request, to reflect the underlying progress in
>> offset commits, filing a feature request on JIRA would be the best way to
>> get this prioritized.
>>
>> > Kafka Streams allows me to horizontally scale by adding as many
>> instances of my application as I need and relies on Kafka to manage
>> distribution by using consumer groups.
>>
>> No it doesn't, see above. Beam allows you to scale out horizontally as
>> well, but neither allows you to scale the source horizontally beyond the
>> number of partitions on the topic.
>>
>> > I'm required to maintain another distributed processing cluster like
>> Spark or Flink (on top of a Kafka cluster I already have)
>>
>> Kafka streams does not run on your kafka cluster. It is a separate
>> runtime that you need to turn up and run jobs for separately, same as
>> flink. The only difference is that, effectively, you can only run kafka
>> streams in application mode while flink can also be run in session/cluster
>> mode.
>>
>> The main downside of Kafka streams is that it can only be used talking
>> with kafka. If you ever want to read batch data or from/to another
>> streaming system you cannot reuse your existing code/architecture and need
>> to rewrite everything. One advantage of decoupled frameworks like beam (or
>> flink or spark) is that the same pipeline and code can be reused for
>> various data sources and sinks, and they come with a library of prebuilt
>> ones.
>>
>> > documentation
>>
>> If the documentation misleadingly suggests that you can set group.id,
>> and doing so does not make upstream offset commits to allow you to access
>> metrics, please file a bug on JIRA so either the documentation or
>> implementation can be corrected.
>>
>> -Daniel
>>
>> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com> wrote:
>>
>>> Thanks for the information Daniel,
>>>
>>> For one - Kafka Consumer Groups is the best practice recommended to
>>> consume for Kafka AFAIK and I would prefer to be able to use that.
>>> Also all observability tools around Kafka use consumer group lag which
>>> is going to be unavailable if consumer groups are not being used.
>>>
>>> Finally in my use case I'm asked to evaluate using Apache Beam vs. Kafka
>>> Streams.
>>> Kafka Streams allows me to horizontally scale by adding as many
>>> instances of my application as I need and relies on Kafka to manage
>>> distribution by using consumer groups.
>>> With Apache Beam I'm required to maintain another distributed processing
>>> cluster like Spark or Flink (on top of a Kafka cluster I already have) to
>>> be able to do the same.
>>> To be clear in this use case there is no need for an additional cluster
>>> except for consumer groups not being used.
>>> This constitutes a disadvantage over Kafka Streams and other solutions
>>> that use consumer groups.
>>>
>>> Furthermore if this use case is not supported I would imagine the
>>> documentation would mention that or at least not imply to the contrary.
>>> In the latest version of the documentation for KafkaIO
>>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>>> shows an example for configuring to use a consumer group while in fact this
>>> settings will not be doing anything of the sort:
>>> [image: image.png]
>>> And:
>>> [image: image.png]
>>>
>>> It seems like this has already been raised in the past - e.g. here
>>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>>> so I'm probably not the first person to be confused about that.
>>>
>>> Cheers,
>>> Shahar.
>>>
>>> ------------------------------
>>>
>>> Shahar Frank
>>>
>>> srfrnk@gmail.com
>>>
>>> +447799561438 <+44%207799%20561438>
>>>
>>> ------------------------------
>>>
>>>
>>>
>>>
>>>
>>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Why do you want to use a consumer group? If you have consumers in other
>>>> jobs, your beam job will fail to receive all messages it should for the
>>>> topic.
>>>>
>>>> > It seems the code attempts to circumvent the partition assignment
>>>> mechanism provided by Kafka to use it's own.
>>>>
>>>> All beam I/Os for partitioned sources do this. They use access to the
>>>> partitioning structure of the underlying system to track their progress
>>>> through each partition and provide feedback for scaling, as well as
>>>> tracking and enforcing exactly once processing semantics. In fact, most
>>>> interops with streaming data processing systems do this, you can see the
>>>> documentation of the flink kafka interop (
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>>> that it does not use or respect the partition assignments.
>>>>
>>>> > By doing that it prevents the user from using consumer groups.
>>>>
>>>> Again, why do you (as a user) want to use consumer groups? What value
>>>> does it provide you?
>>>>
>>>> -Daniel
>>>>
>>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Posting here as suggested here
>>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>.
>>>>>
>>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>>> I've added "group.id" to the consumer properties.
>>>>> When running the pipeline I can see this value sent to Kafka in the
>>>>> consumer properties.
>>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>>> Looking into the code I can see that nowhere is the consumer
>>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>>> consumer group. It seems the code attempts to circumvent the partition
>>>>> assignment mechanism provided by Kafka to use it's own.
>>>>> By doing that it prevents the user from using consumer groups.
>>>>> Is that by intention? Is there any reason why the decision to avoid
>>>>> using consumer groups has been taken?
>>>>> I would love to see any documentation about that if possible please.
>>>>>
>>>>> Cheers,
>>>>> Shahar.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Shahar Frank <sr...@gmail.com>.
Thanks Daniel,

Not sure where the linked article says Kafka Streams doesn't use consumer
groups.
If you run the Kafka Streams demo here
<https://github.com/srfrnk/demo/blob/cg-reader/kstreams/src/main/java/kstreams/App.java>
you
could actually see it using a consumer group:
[image: image.png]

I can file a request for the observability issue however I can't see how a
solution would be possible without using consumer group which is the
mechanism recommended within Kafka for that purpose (i.e. running multiple
consumers from the same application to consume a topic in a
distributed fashion). All Kafka observability tools would be monitoring
consumer groups for that purpose.

Regarding horizontal scaling I may have not been clear about what I mean.
With Kafka Streams I can just run the same app (e.g. with a K8s Deployment)
on as many pods I need and with Kafka Consumer Groups managing the
distribution of data all would work well.
Moreover I can create more pods than the number of partitions and keep them
idle so that if/when some crash others pick the slack quicker. And this
would all be managed by the Kafka consumer group coordinator.
If I do the same with an Apache Beam application - for example with a
Direct Runner or a Flink Runner running in "local" mode - each instance
will consume the entire topic as it is unaware of the other instances. To
work I will be required to use something like a Flink runner with a full
fledged Flink cluster. This is a disadvantage for beam in simpler use cases
where maintaining such an additional cluster is not required for the actual
functionality (e.g. just for filtering) and incurs costs not everyone is
willing to make.

Kafka Streams would not run on a Kafka cluster that is true - I was not
saying it would. It might be run on a K8s cluster which is also how I might
be running K8s itself and any Flink/Spark cluster I might need. While Flink
can run in a standalone as well as cluster mode... as I said earlier in
this use case when Flink is used in standalone mode the use case fails -
which is my issue, as the same will work with Kafka Streams.

Kafka Streams can only connect to Kafka - exactly why I would prefer to use
Beam but to do that I need to show that we have no disadvantages for the
initial use case.
I'm also very aware of the other benefits of Beam - being runner agnostic,
language agnostic, source and sink agostic etc. which is why I would very
much like to use it right now.
Sadly right now if we are unable to scale horizontally without maintaining
another cluster - i.e. Flink or Spark - this is a major disadvantage which
might drive us to use Kafka Streams instead.

I have opened a bug in Github  <https://github.com/apache/beam/issues/25978>for
the issue. I can edit that to be more specific to documentation however I
feel this would be a huge miss for the project.
Also happy to open the same in JIRA if you can point me to where in JIRA I
can do that.
I couldn't find that on the contact-us
<https://beam.apache.org/community/contact-us/> page - it actually points
you to create an issue in Github which is what I did.

Finally I've been looking into other stream consumers in Beam IO.

I might be missing something but I think SqsIO seems to not be managing the
same - but rather expects SQS to manage data distribution between consumers:
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L660

I think same goes for PubsubIO which I can't see where it tries to set any
control over which part of the stream would be managed by which consumer
but lets the Pubsub service do that instead:
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L726

I would ask why should Kafka not be considered a "streaming service" just
like SQS and Pubsub and allow it to manage data distribution just like they
would?

Cheers,
Shahar.

------------------------------

Shahar Frank

srfrnk@gmail.com

+447799561438

------------------------------






On Sat, 15 Apr 2023 at 21:13, Daniel Collins <dp...@google.com> wrote:

> > the best practice recommended to consume for Kafka
>
> This is when using the consumer API directly- when using a framework like
> beam or even kafka streams (which actually doesn't use the consumer group
> assignment mechanism, see here
> https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
> you should rely on the framework's partition to task assignment mechanism.
>
> > all observability tools around Kafka use consumer group lag which is
> going to be unavailable if consumer groups are not being used
>
> This is a valid feature request, to reflect the underlying progress in
> offset commits, filing a feature request on JIRA would be the best way to
> get this prioritized.
>
> > Kafka Streams allows me to horizontally scale by adding as many
> instances of my application as I need and relies on Kafka to manage
> distribution by using consumer groups.
>
> No it doesn't, see above. Beam allows you to scale out horizontally as
> well, but neither allows you to scale the source horizontally beyond the
> number of partitions on the topic.
>
> > I'm required to maintain another distributed processing cluster like
> Spark or Flink (on top of a Kafka cluster I already have)
>
> Kafka streams does not run on your kafka cluster. It is a separate runtime
> that you need to turn up and run jobs for separately, same as flink. The
> only difference is that, effectively, you can only run kafka streams in
> application mode while flink can also be run in session/cluster mode.
>
> The main downside of Kafka streams is that it can only be used talking
> with kafka. If you ever want to read batch data or from/to another
> streaming system you cannot reuse your existing code/architecture and need
> to rewrite everything. One advantage of decoupled frameworks like beam (or
> flink or spark) is that the same pipeline and code can be reused for
> various data sources and sinks, and they come with a library of prebuilt
> ones.
>
> > documentation
>
> If the documentation misleadingly suggests that you can set group.id, and
> doing so does not make upstream offset commits to allow you to access
> metrics, please file a bug on JIRA so either the documentation or
> implementation can be corrected.
>
> -Daniel
>
> On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com> wrote:
>
>> Thanks for the information Daniel,
>>
>> For one - Kafka Consumer Groups is the best practice recommended to
>> consume for Kafka AFAIK and I would prefer to be able to use that.
>> Also all observability tools around Kafka use consumer group lag which is
>> going to be unavailable if consumer groups are not being used.
>>
>> Finally in my use case I'm asked to evaluate using Apache Beam vs. Kafka
>> Streams.
>> Kafka Streams allows me to horizontally scale by adding as many instances
>> of my application as I need and relies on Kafka to manage distribution by
>> using consumer groups.
>> With Apache Beam I'm required to maintain another distributed processing
>> cluster like Spark or Flink (on top of a Kafka cluster I already have) to
>> be able to do the same.
>> To be clear in this use case there is no need for an additional cluster
>> except for consumer groups not being used.
>> This constitutes a disadvantage over Kafka Streams and other solutions
>> that use consumer groups.
>>
>> Furthermore if this use case is not supported I would imagine the
>> documentation would mention that or at least not imply to the contrary.
>> In the latest version of the documentation for KafkaIO
>> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
>> shows an example for configuring to use a consumer group while in fact this
>> settings will not be doing anything of the sort:
>> [image: image.png]
>> And:
>> [image: image.png]
>>
>> It seems like this has already been raised in the past - e.g. here
>> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
>> so I'm probably not the first person to be confused about that.
>>
>> Cheers,
>> Shahar.
>>
>> ------------------------------
>>
>> Shahar Frank
>>
>> srfrnk@gmail.com
>>
>> +447799561438 <+44%207799%20561438>
>>
>> ------------------------------
>>
>>
>>
>>
>>
>> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <de...@beam.apache.org>
>> wrote:
>>
>>> Why do you want to use a consumer group? If you have consumers in other
>>> jobs, your beam job will fail to receive all messages it should for the
>>> topic.
>>>
>>> > It seems the code attempts to circumvent the partition assignment
>>> mechanism provided by Kafka to use it's own.
>>>
>>> All beam I/Os for partitioned sources do this. They use access to the
>>> partitioning structure of the underlying system to track their progress
>>> through each partition and provide feedback for scaling, as well as
>>> tracking and enforcing exactly once processing semantics. In fact, most
>>> interops with streaming data processing systems do this, you can see the
>>> documentation of the flink kafka interop (
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>>> that it does not use or respect the partition assignments.
>>>
>>> > By doing that it prevents the user from using consumer groups.
>>>
>>> Again, why do you (as a user) want to use consumer groups? What value
>>> does it provide you?
>>>
>>> -Daniel
>>>
>>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Posting here as suggested here
>>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>.
>>>>
>>>> I'm using KafkaIO to consume events from a Kafka topic.
>>>> I've added "group.id" to the consumer properties.
>>>> When running the pipeline I can see this value sent to Kafka in the
>>>> consumer properties.
>>>> The consumers created by KafkaIO fail to join the consumer group.
>>>> Looking into the code I can see that nowhere is the consumer
>>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>>> consumer group. It seems the code attempts to circumvent the partition
>>>> assignment mechanism provided by Kafka to use it's own.
>>>> By doing that it prevents the user from using consumer groups.
>>>> Is that by intention? Is there any reason why the decision to avoid
>>>> using consumer groups has been taken?
>>>> I would love to see any documentation about that if possible please.
>>>>
>>>> Cheers,
>>>> Shahar.
>>>>
>>>>
>>>>
>>>>
>>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Daniel Collins via dev <de...@beam.apache.org>.
> the best practice recommended to consume for Kafka

This is when using the consumer API directly- when using a framework like
beam or even kafka streams (which actually doesn't use the consumer group
assignment mechanism, see here
https://docs.confluent.io/platform/current/streams/architecture.html#stream-partitions-and-tasks)
you should rely on the framework's partition to task assignment mechanism.

> all observability tools around Kafka use consumer group lag which is
going to be unavailable if consumer groups are not being used

This is a valid feature request, to reflect the underlying progress in
offset commits, filing a feature request on JIRA would be the best way to
get this prioritized.

> Kafka Streams allows me to horizontally scale by adding as many instances
of my application as I need and relies on Kafka to manage distribution by
using consumer groups.

No it doesn't, see above. Beam allows you to scale out horizontally as
well, but neither allows you to scale the source horizontally beyond the
number of partitions on the topic.

> I'm required to maintain another distributed processing cluster like
Spark or Flink (on top of a Kafka cluster I already have)

Kafka streams does not run on your kafka cluster. It is a separate runtime
that you need to turn up and run jobs for separately, same as flink. The
only difference is that, effectively, you can only run kafka streams in
application mode while flink can also be run in session/cluster mode.

The main downside of Kafka streams is that it can only be used talking with
kafka. If you ever want to read batch data or from/to another streaming
system you cannot reuse your existing code/architecture and need to rewrite
everything. One advantage of decoupled frameworks like beam (or flink or
spark) is that the same pipeline and code can be reused for various data
sources and sinks, and they come with a library of prebuilt ones.

> documentation

If the documentation misleadingly suggests that you can set group.id, and
doing so does not make upstream offset commits to allow you to access
metrics, please file a bug on JIRA so either the documentation or
implementation can be corrected.

-Daniel

On Sat, Apr 15, 2023 at 1:37 PM Shahar Frank <sr...@gmail.com> wrote:

> Thanks for the information Daniel,
>
> For one - Kafka Consumer Groups is the best practice recommended to
> consume for Kafka AFAIK and I would prefer to be able to use that.
> Also all observability tools around Kafka use consumer group lag which is
> going to be unavailable if consumer groups are not being used.
>
> Finally in my use case I'm asked to evaluate using Apache Beam vs. Kafka
> Streams.
> Kafka Streams allows me to horizontally scale by adding as many instances
> of my application as I need and relies on Kafka to manage distribution by
> using consumer groups.
> With Apache Beam I'm required to maintain another distributed processing
> cluster like Spark or Flink (on top of a Kafka cluster I already have) to
> be able to do the same.
> To be clear in this use case there is no need for an additional cluster
> except for consumer groups not being used.
> This constitutes a disadvantage over Kafka Streams and other solutions
> that use consumer groups.
>
> Furthermore if this use case is not supported I would imagine the
> documentation would mention that or at least not imply to the contrary.
> In the latest version of the documentation for KafkaIO
> <https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> it
> shows an example for configuring to use a consumer group while in fact this
> settings will not be doing anything of the sort:
> [image: image.png]
> And:
> [image: image.png]
>
> It seems like this has already been raised in the past - e.g. here
> <https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id> -
> so I'm probably not the first person to be confused about that.
>
> Cheers,
> Shahar.
>
> ------------------------------
>
> Shahar Frank
>
> srfrnk@gmail.com
>
> +447799561438 <+44%207799%20561438>
>
> ------------------------------
>
>
>
>
>
> On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <de...@beam.apache.org>
> wrote:
>
>> Why do you want to use a consumer group? If you have consumers in other
>> jobs, your beam job will fail to receive all messages it should for the
>> topic.
>>
>> > It seems the code attempts to circumvent the partition assignment
>> mechanism provided by Kafka to use it's own.
>>
>> All beam I/Os for partitioned sources do this. They use access to the
>> partitioning structure of the underlying system to track their progress
>> through each partition and provide feedback for scaling, as well as
>> tracking and enforcing exactly once processing semantics. In fact, most
>> interops with streaming data processing systems do this, you can see the
>> documentation of the flink kafka interop (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
>> that it does not use or respect the partition assignments.
>>
>> > By doing that it prevents the user from using consumer groups.
>>
>> Again, why do you (as a user) want to use consumer groups? What value
>> does it provide you?
>>
>> -Daniel
>>
>> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Posting here as suggested here
>>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>.
>>>
>>> I'm using KafkaIO to consume events from a Kafka topic.
>>> I've added "group.id" to the consumer properties.
>>> When running the pipeline I can see this value sent to Kafka in the
>>> consumer properties.
>>> The consumers created by KafkaIO fail to join the consumer group.
>>> Looking into the code I can see that nowhere is the consumer
>>> "subscribing" to the topic which is how KafkaConsumer should join a
>>> consumer group. It seems the code attempts to circumvent the partition
>>> assignment mechanism provided by Kafka to use it's own.
>>> By doing that it prevents the user from using consumer groups.
>>> Is that by intention? Is there any reason why the decision to avoid
>>> using consumer groups has been taken?
>>> I would love to see any documentation about that if possible please.
>>>
>>> Cheers,
>>> Shahar.
>>>
>>>
>>>
>>>
>>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Shahar Frank <sr...@gmail.com>.
Thanks for the information Daniel,

For one - Kafka Consumer Groups is the best practice recommended to consume
for Kafka AFAIK and I would prefer to be able to use that.
Also all observability tools around Kafka use consumer group lag which is
going to be unavailable if consumer groups are not being used.

Finally in my use case I'm asked to evaluate using Apache Beam vs. Kafka
Streams.
Kafka Streams allows me to horizontally scale by adding as many instances
of my application as I need and relies on Kafka to manage distribution by
using consumer groups.
With Apache Beam I'm required to maintain another distributed processing
cluster like Spark or Flink (on top of a Kafka cluster I already have) to
be able to do the same.
To be clear in this use case there is no need for an additional cluster
except for consumer groups not being used.
This constitutes a disadvantage over Kafka Streams and other solutions that
use consumer groups.

Furthermore if this use case is not supported I would imagine the
documentation would mention that or at least not imply to the contrary.
In the latest version of the documentation for KafkaIO
<https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>
it
shows an example for configuring to use a consumer group while in fact this
settings will not be doing anything of the sort:
[image: image.png]
And:
[image: image.png]

It seems like this has already been raised in the past - e.g. here
<https://stackoverflow.com/questions/63001274/apache-beam-kafkaio-consumers-in-consumer-group-getting-assigned-unique-group-id>
-
so I'm probably not the first person to be confused about that.

Cheers,
Shahar.

------------------------------

Shahar Frank

srfrnk@gmail.com

+447799561438

------------------------------





On Sat, 15 Apr 2023 at 13:42, Daniel Collins via dev <de...@beam.apache.org>
wrote:

> Why do you want to use a consumer group? If you have consumers in other
> jobs, your beam job will fail to receive all messages it should for the
> topic.
>
> > It seems the code attempts to circumvent the partition assignment
> mechanism provided by Kafka to use it's own.
>
> All beam I/Os for partitioned sources do this. They use access to the
> partitioning structure of the underlying system to track their progress
> through each partition and provide feedback for scaling, as well as
> tracking and enforcing exactly once processing semantics. In fact, most
> interops with streaming data processing systems do this, you can see the
> documentation of the flink kafka interop (
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
> that it does not use or respect the partition assignments.
>
> > By doing that it prevents the user from using consumer groups.
>
> Again, why do you (as a user) want to use consumer groups? What value does
> it provide you?
>
> -Daniel
>
> On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com> wrote:
>
>> Hi All,
>>
>> Posting here as suggested here
>> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>.
>>
>> I'm using KafkaIO to consume events from a Kafka topic.
>> I've added "group.id" to the consumer properties.
>> When running the pipeline I can see this value sent to Kafka in the
>> consumer properties.
>> The consumers created by KafkaIO fail to join the consumer group.
>> Looking into the code I can see that nowhere is the consumer
>> "subscribing" to the topic which is how KafkaConsumer should join a
>> consumer group. It seems the code attempts to circumvent the partition
>> assignment mechanism provided by Kafka to use it's own.
>> By doing that it prevents the user from using consumer groups.
>> Is that by intention? Is there any reason why the decision to avoid using
>> consumer groups has been taken?
>> I would love to see any documentation about that if possible please.
>>
>> Cheers,
>> Shahar.
>>
>>
>>
>>
>>

Re: KafkaIO does not make use of Kafka Consumer Groups [kafka] [java] [io]

Posted by Daniel Collins via dev <de...@beam.apache.org>.
Why do you want to use a consumer group? If you have consumers in other
jobs, your beam job will fail to receive all messages it should for the
topic.

> It seems the code attempts to circumvent the partition assignment
mechanism provided by Kafka to use it's own.

All beam I/Os for partitioned sources do this. They use access to the
partitioning structure of the underlying system to track their progress
through each partition and provide feedback for scaling, as well as
tracking and enforcing exactly once processing semantics. In fact, most
interops with streaming data processing systems do this, you can see the
documentation of the flink kafka interop (
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#behind-the-scene)
that it does not use or respect the partition assignments.

> By doing that it prevents the user from using consumer groups.

Again, why do you (as a user) want to use consumer groups? What value does
it provide you?

-Daniel

On Sat, Apr 15, 2023 at 4:50 AM Shahar Frank <sr...@gmail.com> wrote:

> Hi All,
>
> Posting here as suggested here
> <https://github.com/apache/beam/issues/25978#issuecomment-1508530483>.
>
> I'm using KafkaIO to consume events from a Kafka topic.
> I've added "group.id" to the consumer properties.
> When running the pipeline I can see this value sent to Kafka in the
> consumer properties.
> The consumers created by KafkaIO fail to join the consumer group.
> Looking into the code I can see that nowhere is the consumer "subscribing"
> to the topic which is how KafkaConsumer should join a consumer group. It
> seems the code attempts to circumvent the partition assignment mechanism
> provided by Kafka to use it's own.
> By doing that it prevents the user from using consumer groups.
> Is that by intention? Is there any reason why the decision to avoid using
> consumer groups has been taken?
> I would love to see any documentation about that if possible please.
>
> Cheers,
> Shahar.
>
>
>
>
>