You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Boyuan Zhang <bo...@google.com> on 2020/05/29 02:16:49 UTC

[Discuss] Build Kafka read transform on top of SplittableDoFn

Hi team,

I'm Boyuan, currently working on building a Kafka read PTransform on top of
SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want
to discuss with you:

1.  Compared to the KafkaIO.Read
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
the SplittableDoFn Kafka version allows taking TopicPartition and
startReadTime as elements and processing them during execution time,
instead of configuring topics at pipeline construction time. I'm wondering
whether there are other configurations we also want to populate during
pipeline execution time instead of construction time. Taking these
configurations as elements would make value when they could be different
for different TopicPartition. For a list of configurations we have now,
please refer to KafkaIO.Read
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
.

2. I also want to offer a simple way for KafkaIO.Read to expand with the
SDF version PTransform. Almost all configurations can be translated easily
from KafkaIO.Read to the SDF version read except custom
TimestampPolicyFactory (It's easy to translate build-in default types such
as withProcessingTime
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
withCreateTime
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
and withLogAppendTime
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
With SplittableDoFn, we have WatermarkEstimator
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
to track watermark per TopicPartition. Thus, instead of
TimestampPolicyFactory
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java>
,
we need the user to provide a function which can extract output timestamp
from a KafkaRecord(like withTimestampFn
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
My question here is, are the default types enough for current Kafka.Read
users? If the custom TimestampPolicy is really in common? Is it okay to use
current API withTimestampFn
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>
in
KafkaIO.Read to accept the custom function and populate it to the SDF read
transform?

Thanks for your help!

[1] https://beam.apache.org/blog/splittable-do-fn/
[2] https://s.apache.org/splittable-do-fn
[3] My prototype PR https://github.com/apache/beam/pull/11749

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Pedro H S Teixeira <pe...@gmail.com>.
Thanks, appreciated the links.

Ended up using `Read.from` using an UnboundedSource due to some challenges
with the actual splitting part of SDF.

Perhaps it's trivial knowledge, but just sharing here that when testing (in
direct runner) with unbounded sources, one has to test with more elements
than UnboundedReadEvaluatorFactory.ARBITRARY_MAX_ELEMENTS (10). I was
attempting some local tests with 5 elements and not understanding why
getWatermark wasn't called :)






On Mon, Jun 15, 2020 at 12:13 PM Boyuan Zhang <bo...@google.com> wrote:

> Thanks Pablo!
> Hi Pedro, as Pablo mentioned, the core PTransform is ReadViaSDF, and the
> core DoFn is ReadFromKafkaDoFn. We also have some other IOs in SDF:
> HBaseIO
> <https://github.com/apache/beam/blob/52419e93ee9fa8c823eb505c472969fc7849e247/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L38>,
> CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps
> : )
>
> On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada <pa...@google.com> wrote:
>
>> Hi Pedro,
>> Boyuan shared her prototype implementation in [1]. If you're coding a
>> SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
>> Best
>> -P.
>> [1] https://github.com/apache/beam/pull/11749/files
>>
>> On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pe...@gmail.com>
>> wrote:
>>
>>> Hi Boyuan,
>>>
>>> Is the implementation (even if incomplete) open source / available at
>>> this moment?
>>>
>>> Trying to implement here an IO to a custom source here using
>>> SplittableDoFn, and it would be helpful to see more examples :)
>>>
>>> Thanks,
>>> Pedro
>>>
>>>
>>> On 2020/05/29 02:16:49, Boyuan Zhang <bo...@google.com> wrote:
>>> > Hi team,
>>> >
>>> > I'm Boyuan, currently working on building a Kafka read PTransform on
>>> top of
>>> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>>> want
>>> > to discuss with you:
>>> >
>>> > 1.  Compared to the KafkaIO.Read
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>>> >,
>>> > the SplittableDoFn Kafka version allows taking TopicPartition and
>>> > startReadTime as elements and processing them during execution time,
>>> > instead of configuring topics at pipeline construction time. I'm
>>> wondering
>>> > whether there are other configurations we also want to populate during
>>> > pipeline execution time instead of construction time. Taking these
>>> > configurations as elements would make value when they could be
>>> different
>>> > for different TopicPartition. For a list of configurations we have now,
>>> > please refer to KafkaIO.Read
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>>> >
>>> > .
>>> >
>>> > 2. I also want to offer a simple way for KafkaIO.Read to expand with
>>> the
>>> > SDF version PTransform. Almost all configurations can be translated
>>> easily
>>> > from KafkaIO.Read to the SDF version read except custom
>>> > TimestampPolicyFactory (It's easy to translate build-in default types
>>> such
>>> > as withProcessingTime
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710
>>> >,
>>> > withCreateTime
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726
>>> >
>>> > and withLogAppendTime
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699
>>> >.).
>>> > With SplittableDoFn, we have WatermarkEstimator
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
>>> >
>>> > to track watermark per TopicPartition. Thus, instead of
>>> > TimestampPolicyFactory
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
>>> >
>>> > ,
>>> > we need the user to provide a function which can extract output
>>> timestamp
>>> > from a KafkaRecord(like withTimestampFn
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>>> >).
>>> > My question here is, are the default types enough for current
>>> Kafka.Read
>>> > users? If the custom TimestampPolicy is really in common? Is it okay
>>> to use
>>> > current API withTimestampFn
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>>> >
>>> > in
>>> > KafkaIO.Read to accept the custom function and populate it to the SDF
>>> read
>>> > transform?
>>> >
>>> > Thanks for your help!
>>> >
>>> > [1] https://beam.apache.org/blog/splittable-do-fn/
>>> > [2] https://s.apache.org/splittable-do-fn
>>> > [3] My prototype PR https://github.com/apache/beam/pull/11749
>>> >
>>>
>>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Boyuan Zhang <bo...@google.com>.
Thanks Pablo!
Hi Pedro, as Pablo mentioned, the core PTransform is ReadViaSDF, and the
core DoFn is ReadFromKafkaDoFn. We also have some other IOs in SDF: HBaseIO
<https://github.com/apache/beam/blob/52419e93ee9fa8c823eb505c472969fc7849e247/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L38>,
CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps : )

On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada <pa...@google.com> wrote:

> Hi Pedro,
> Boyuan shared her prototype implementation in [1]. If you're coding a
> SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
> Best
> -P.
> [1] https://github.com/apache/beam/pull/11749/files
>
> On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pe...@gmail.com>
> wrote:
>
>> Hi Boyuan,
>>
>> Is the implementation (even if incomplete) open source / available at
>> this moment?
>>
>> Trying to implement here an IO to a custom source here using
>> SplittableDoFn, and it would be helpful to see more examples :)
>>
>> Thanks,
>> Pedro
>>
>>
>> On 2020/05/29 02:16:49, Boyuan Zhang <bo...@google.com> wrote:
>> > Hi team,
>> >
>> > I'm Boyuan, currently working on building a Kafka read PTransform on
>> top of
>> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>> want
>> > to discuss with you:
>> >
>> > 1.  Compared to the KafkaIO.Read
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>> >,
>> > the SplittableDoFn Kafka version allows taking TopicPartition and
>> > startReadTime as elements and processing them during execution time,
>> > instead of configuring topics at pipeline construction time. I'm
>> wondering
>> > whether there are other configurations we also want to populate during
>> > pipeline execution time instead of construction time. Taking these
>> > configurations as elements would make value when they could be different
>> > for different TopicPartition. For a list of configurations we have now,
>> > please refer to KafkaIO.Read
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>> >
>> > .
>> >
>> > 2. I also want to offer a simple way for KafkaIO.Read to expand with the
>> > SDF version PTransform. Almost all configurations can be translated
>> easily
>> > from KafkaIO.Read to the SDF version read except custom
>> > TimestampPolicyFactory (It's easy to translate build-in default types
>> such
>> > as withProcessingTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710
>> >,
>> > withCreateTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726
>> >
>> > and withLogAppendTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699
>> >.).
>> > With SplittableDoFn, we have WatermarkEstimator
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
>> >
>> > to track watermark per TopicPartition. Thus, instead of
>> > TimestampPolicyFactory
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
>> >
>> > ,
>> > we need the user to provide a function which can extract output
>> timestamp
>> > from a KafkaRecord(like withTimestampFn
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>> >).
>> > My question here is, are the default types enough for current Kafka.Read
>> > users? If the custom TimestampPolicy is really in common? Is it okay to
>> use
>> > current API withTimestampFn
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>> >
>> > in
>> > KafkaIO.Read to accept the custom function and populate it to the SDF
>> read
>> > transform?
>> >
>> > Thanks for your help!
>> >
>> > [1] https://beam.apache.org/blog/splittable-do-fn/
>> > [2] https://s.apache.org/splittable-do-fn
>> > [3] My prototype PR https://github.com/apache/beam/pull/11749
>> >
>>
>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Pablo Estrada <pa...@google.com>.
Hi Pedro,
Boyuan shared her prototype implementation in [1]. If you're coding a
SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
Best
-P.
[1] https://github.com/apache/beam/pull/11749/files

On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pe...@gmail.com>
wrote:

> Hi Boyuan,
>
> Is the implementation (even if incomplete) open source / available at this
> moment?
>
> Trying to implement here an IO to a custom source here using
> SplittableDoFn, and it would be helpful to see more examples :)
>
> Thanks,
> Pedro
>
>
> On 2020/05/29 02:16:49, Boyuan Zhang <bo...@google.com> wrote:
> > Hi team,
> >
> > I'm Boyuan, currently working on building a Kafka read PTransform on top
> of
> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want
> > to discuss with you:
> >
> > 1.  Compared to the KafkaIO.Read
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
> >,
> > the SplittableDoFn Kafka version allows taking TopicPartition and
> > startReadTime as elements and processing them during execution time,
> > instead of configuring topics at pipeline construction time. I'm
> wondering
> > whether there are other configurations we also want to populate during
> > pipeline execution time instead of construction time. Taking these
> > configurations as elements would make value when they could be different
> > for different TopicPartition. For a list of configurations we have now,
> > please refer to KafkaIO.Read
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
> >
> > .
> >
> > 2. I also want to offer a simple way for KafkaIO.Read to expand with the
> > SDF version PTransform. Almost all configurations can be translated
> easily
> > from KafkaIO.Read to the SDF version read except custom
> > TimestampPolicyFactory (It's easy to translate build-in default types
> such
> > as withProcessingTime
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710
> >,
> > withCreateTime
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726
> >
> > and withLogAppendTime
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699
> >.).
> > With SplittableDoFn, we have WatermarkEstimator
> > <
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
> >
> > to track watermark per TopicPartition. Thus, instead of
> > TimestampPolicyFactory
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
> >
> > ,
> > we need the user to provide a function which can extract output timestamp
> > from a KafkaRecord(like withTimestampFn
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
> >).
> > My question here is, are the default types enough for current Kafka.Read
> > users? If the custom TimestampPolicy is really in common? Is it okay to
> use
> > current API withTimestampFn
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
> >
> > in
> > KafkaIO.Read to accept the custom function and populate it to the SDF
> read
> > transform?
> >
> > Thanks for your help!
> >
> > [1] https://beam.apache.org/blog/splittable-do-fn/
> > [2] https://s.apache.org/splittable-do-fn
> > [3] My prototype PR https://github.com/apache/beam/pull/11749
> >
>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Pedro H S Teixeira <pe...@gmail.com>.
Hi Boyuan,

Is the implementation (even if incomplete) open source / available at this moment?

Trying to implement here an IO to a custom source here using SplittableDoFn, and it would be helpful to see more examples :)

Thanks,
Pedro


On 2020/05/29 02:16:49, Boyuan Zhang <bo...@google.com> wrote: 
> Hi team,
> 
> I'm Boyuan, currently working on building a Kafka read PTransform on top of
> SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want
> to discuss with you:
> 
> 1.  Compared to the KafkaIO.Read
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
> the SplittableDoFn Kafka version allows taking TopicPartition and
> startReadTime as elements and processing them during execution time,
> instead of configuring topics at pipeline construction time. I'm wondering
> whether there are other configurations we also want to populate during
> pipeline execution time instead of construction time. Taking these
> configurations as elements would make value when they could be different
> for different TopicPartition. For a list of configurations we have now,
> please refer to KafkaIO.Read
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
> .
> 
> 2. I also want to offer a simple way for KafkaIO.Read to expand with the
> SDF version PTransform. Almost all configurations can be translated easily
> from KafkaIO.Read to the SDF version read except custom
> TimestampPolicyFactory (It's easy to translate build-in default types such
> as withProcessingTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
> withCreateTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
> and withLogAppendTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
> With SplittableDoFn, we have WatermarkEstimator
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
> to track watermark per TopicPartition. Thus, instead of
> TimestampPolicyFactory
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java>
> ,
> we need the user to provide a function which can extract output timestamp
> from a KafkaRecord(like withTimestampFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
> My question here is, are the default types enough for current Kafka.Read
> users? If the custom TimestampPolicy is really in common? Is it okay to use
> current API withTimestampFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>
> in
> KafkaIO.Read to accept the custom function and populate it to the SDF read
> transform?
> 
> Thanks for your help!
> 
> [1] https://beam.apache.org/blog/splittable-do-fn/
> [2] https://s.apache.org/splittable-do-fn
> [3] My prototype PR https://github.com/apache/beam/pull/11749
> 

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Boyuan Zhang <bo...@google.com>.
Re Alexey:

1. One of the demanded feature is to discover (and remove probably) new
> partitions and topics in runtime [1].


Yes, that's the major motivation for us to build Kafka read on top of
SplittableDoFn.

Do you expect any non-compatible API changes by adding SDF Read version or
> it’s going to be just another user API that can be used in parallel with
> old one?


I want to create a ReadFromKafkaViaSDF<input = TopicPartition, output =
KafkaRecord> Ptransform. This transform requires Kafka Consumer
configurations and a user-defined function which extracts output timestamp
from a KakfaRecord at construction time(or as I mentioned in the first
thread, we can treat configuration as input if that could be different for
TopicPartitions). From user perspective, there are 2 ways to
use ReadFromKafkaViaSDF:
1. I'll add a new API useSDFTransform() to Kafka.Read. When this flag is
set, ReadFromKafkaViaSDF will be added into the pipeline. Here I get the
problem of translating custom TimestampPolicy to the user-defined function.
So I want to ask whether it's feasible to add another new API
extractTimestampFn(or reuse TimestampFn()) to KafkaIO.Read.
2. Or the user can choose to rewrite their pipeline
with ReadFromKafkaViaSDF directly. Here we require the user to create a new
function to extract output timestamp.

Re Reuven:

Are you able to support CustomTimestampPolicyWithLimitedDelay?


For the new transform  ReadFromKafkaViaSDF, yes as long as the user
provides the timestamp function. If the user wants to use the SDF transform
via KafkaIO.Read, we need to expose new APIs to KafkaIO.Read to accept the
timestamp function and the type of policy.

On Fri, May 29, 2020 at 10:58 AM Reuven Lax <re...@google.com> wrote:

> Are you able to support CustomTimestampPolicyWithLimitedDelay?
>
> On Fri, May 29, 2020 at 9:58 AM Boyuan Zhang <bo...@google.com> wrote:
>
>> Yes, the WatermarkEstimator tracks output timestamp and gives an estimate
>> of watermark per partition.
>> The problem is, a user can configure KafkaIO.Read() with custom
>> TimestampPolicy, which cannot be translated into a simple function which
>> extracts timestamp from a KafkaRecord. In this case, I need to either ask
>> the customer to write such a kind of function or just support built-in
>> types.
>>
>> On Thu, May 28, 2020 at 9:12 PM Reuven Lax <re...@google.com> wrote:
>>
>>> This is per-partition, right? In that case I assume it will match the
>>> current Kafka watermark.
>>>
>>> On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <bo...@google.com> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> I'm going to use MonotonicallyIncreasing
>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> by
>>>> default and in the future, we may want to support custom kind if there is a
>>>> request.
>>>>
>>>> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Which WatermarkEstimator do you think should be used?
>>>>>
>>>>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <bo...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi team,
>>>>>>
>>>>>> I'm Boyuan, currently working on building a Kafka read PTransform on
>>>>>> top of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>>>>>> want to discuss with you:
>>>>>>
>>>>>> 1.  Compared to the KafkaIO.Read
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>>>>>> the SplittableDoFn Kafka version allows taking TopicPartition and
>>>>>> startReadTime as elements and processing them during execution time,
>>>>>> instead of configuring topics at pipeline construction time. I'm wondering
>>>>>> whether there are other configurations we also want to populate during
>>>>>> pipeline execution time instead of construction time. Taking these
>>>>>> configurations as elements would make value when they could be different
>>>>>> for different TopicPartition. For a list of configurations we have now,
>>>>>> please refer to KafkaIO.Read
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
>>>>>> .
>>>>>>
>>>>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with
>>>>>> the SDF version PTransform. Almost all configurations can be translated
>>>>>> easily from KafkaIO.Read to the SDF version read except custom
>>>>>> TimestampPolicyFactory (It's easy to translate build-in default types such
>>>>>> as withProcessingTime
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>>>>>> withCreateTime
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>>>>>> and withLogAppendTime
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>>>>>> With SplittableDoFn, we have WatermarkEstimator
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>>>>>> to track watermark per TopicPartition. Thus, instead of
>>>>>> TimestampPolicyFactory
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> ,
>>>>>> we need the user to provide a function which can extract output timestamp
>>>>>> from a KafkaRecord(like withTimestampFn
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>>>>>> My question here is, are the default types enough for current Kafka.Read
>>>>>> users? If the custom TimestampPolicy is really in common? Is it okay to use
>>>>>> current API withTimestampFn
>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> in
>>>>>> KafkaIO.Read to accept the custom function and populate it to the SDF read
>>>>>> transform?
>>>>>>
>>>>>> Thanks for your help!
>>>>>>
>>>>>> [1] https://beam.apache.org/blog/splittable-do-fn/
>>>>>> [2] https://s.apache.org/splittable-do-fn
>>>>>> [3] My prototype PR https://github.com/apache/beam/pull/11749
>>>>>>
>>>>>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Reuven Lax <re...@google.com>.
Are you able to support CustomTimestampPolicyWithLimitedDelay?

On Fri, May 29, 2020 at 9:58 AM Boyuan Zhang <bo...@google.com> wrote:

> Yes, the WatermarkEstimator tracks output timestamp and gives an estimate
> of watermark per partition.
> The problem is, a user can configure KafkaIO.Read() with custom
> TimestampPolicy, which cannot be translated into a simple function which
> extracts timestamp from a KafkaRecord. In this case, I need to either ask
> the customer to write such a kind of function or just support built-in
> types.
>
> On Thu, May 28, 2020 at 9:12 PM Reuven Lax <re...@google.com> wrote:
>
>> This is per-partition, right? In that case I assume it will match the
>> current Kafka watermark.
>>
>> On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> I'm going to use MonotonicallyIncreasing
>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> by
>>> default and in the future, we may want to support custom kind if there is a
>>> request.
>>>
>>> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Which WatermarkEstimator do you think should be used?
>>>>
>>>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <bo...@google.com>
>>>> wrote:
>>>>
>>>>> Hi team,
>>>>>
>>>>> I'm Boyuan, currently working on building a Kafka read PTransform on
>>>>> top of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>>>>> want to discuss with you:
>>>>>
>>>>> 1.  Compared to the KafkaIO.Read
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>>>>> the SplittableDoFn Kafka version allows taking TopicPartition and
>>>>> startReadTime as elements and processing them during execution time,
>>>>> instead of configuring topics at pipeline construction time. I'm wondering
>>>>> whether there are other configurations we also want to populate during
>>>>> pipeline execution time instead of construction time. Taking these
>>>>> configurations as elements would make value when they could be different
>>>>> for different TopicPartition. For a list of configurations we have now,
>>>>> please refer to KafkaIO.Read
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
>>>>> .
>>>>>
>>>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with
>>>>> the SDF version PTransform. Almost all configurations can be translated
>>>>> easily from KafkaIO.Read to the SDF version read except custom
>>>>> TimestampPolicyFactory (It's easy to translate build-in default types such
>>>>> as withProcessingTime
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>>>>> withCreateTime
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>>>>> and withLogAppendTime
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>>>>> With SplittableDoFn, we have WatermarkEstimator
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>>>>> to track watermark per TopicPartition. Thus, instead of
>>>>> TimestampPolicyFactory
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> ,
>>>>> we need the user to provide a function which can extract output timestamp
>>>>> from a KafkaRecord(like withTimestampFn
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>>>>> My question here is, are the default types enough for current Kafka.Read
>>>>> users? If the custom TimestampPolicy is really in common? Is it okay to use
>>>>> current API withTimestampFn
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> in
>>>>> KafkaIO.Read to accept the custom function and populate it to the SDF read
>>>>> transform?
>>>>>
>>>>> Thanks for your help!
>>>>>
>>>>> [1] https://beam.apache.org/blog/splittable-do-fn/
>>>>> [2] https://s.apache.org/splittable-do-fn
>>>>> [3] My prototype PR https://github.com/apache/beam/pull/11749
>>>>>
>>>>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Boyuan Zhang <bo...@google.com>.
Yes, the WatermarkEstimator tracks output timestamp and gives an estimate
of watermark per partition.
The problem is, a user can configure KafkaIO.Read() with custom
TimestampPolicy, which cannot be translated into a simple function which
extracts timestamp from a KafkaRecord. In this case, I need to either ask
the customer to write such a kind of function or just support built-in
types.

On Thu, May 28, 2020 at 9:12 PM Reuven Lax <re...@google.com> wrote:

> This is per-partition, right? In that case I assume it will match the
> current Kafka watermark.
>
> On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Hi Reuven,
>>
>> I'm going to use MonotonicallyIncreasing
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> by
>> default and in the future, we may want to support custom kind if there is a
>> request.
>>
>> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Which WatermarkEstimator do you think should be used?
>>>
>>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <bo...@google.com> wrote:
>>>
>>>> Hi team,
>>>>
>>>> I'm Boyuan, currently working on building a Kafka read PTransform on
>>>> top of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>>>> want to discuss with you:
>>>>
>>>> 1.  Compared to the KafkaIO.Read
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>>>> the SplittableDoFn Kafka version allows taking TopicPartition and
>>>> startReadTime as elements and processing them during execution time,
>>>> instead of configuring topics at pipeline construction time. I'm wondering
>>>> whether there are other configurations we also want to populate during
>>>> pipeline execution time instead of construction time. Taking these
>>>> configurations as elements would make value when they could be different
>>>> for different TopicPartition. For a list of configurations we have now,
>>>> please refer to KafkaIO.Read
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
>>>> .
>>>>
>>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with
>>>> the SDF version PTransform. Almost all configurations can be translated
>>>> easily from KafkaIO.Read to the SDF version read except custom
>>>> TimestampPolicyFactory (It's easy to translate build-in default types such
>>>> as withProcessingTime
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>>>> withCreateTime
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>>>> and withLogAppendTime
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>>>> With SplittableDoFn, we have WatermarkEstimator
>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>>>> to track watermark per TopicPartition. Thus, instead of
>>>> TimestampPolicyFactory
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> ,
>>>> we need the user to provide a function which can extract output timestamp
>>>> from a KafkaRecord(like withTimestampFn
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>>>> My question here is, are the default types enough for current Kafka.Read
>>>> users? If the custom TimestampPolicy is really in common? Is it okay to use
>>>> current API withTimestampFn
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> in
>>>> KafkaIO.Read to accept the custom function and populate it to the SDF read
>>>> transform?
>>>>
>>>> Thanks for your help!
>>>>
>>>> [1] https://beam.apache.org/blog/splittable-do-fn/
>>>> [2] https://s.apache.org/splittable-do-fn
>>>> [3] My prototype PR https://github.com/apache/beam/pull/11749
>>>>
>>>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Reuven Lax <re...@google.com>.
This is per-partition, right? In that case I assume it will match the
current Kafka watermark.

On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi Reuven,
>
> I'm going to use MonotonicallyIncreasing
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> by
> default and in the future, we may want to support custom kind if there is a
> request.
>
> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <re...@google.com> wrote:
>
>> Which WatermarkEstimator do you think should be used?
>>
>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> Hi team,
>>>
>>> I'm Boyuan, currently working on building a Kafka read PTransform on top
>>> of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>>> want to discuss with you:
>>>
>>> 1.  Compared to the KafkaIO.Read
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>>> the SplittableDoFn Kafka version allows taking TopicPartition and
>>> startReadTime as elements and processing them during execution time,
>>> instead of configuring topics at pipeline construction time. I'm wondering
>>> whether there are other configurations we also want to populate during
>>> pipeline execution time instead of construction time. Taking these
>>> configurations as elements would make value when they could be different
>>> for different TopicPartition. For a list of configurations we have now,
>>> please refer to KafkaIO.Read
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
>>> .
>>>
>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with the
>>> SDF version PTransform. Almost all configurations can be translated easily
>>> from KafkaIO.Read to the SDF version read except custom
>>> TimestampPolicyFactory (It's easy to translate build-in default types such
>>> as withProcessingTime
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>>> withCreateTime
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>>> and withLogAppendTime
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>>> With SplittableDoFn, we have WatermarkEstimator
>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>>> to track watermark per TopicPartition. Thus, instead of
>>> TimestampPolicyFactory
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> ,
>>> we need the user to provide a function which can extract output timestamp
>>> from a KafkaRecord(like withTimestampFn
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>>> My question here is, are the default types enough for current Kafka.Read
>>> users? If the custom TimestampPolicy is really in common? Is it okay to use
>>> current API withTimestampFn
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> in
>>> KafkaIO.Read to accept the custom function and populate it to the SDF read
>>> transform?
>>>
>>> Thanks for your help!
>>>
>>> [1] https://beam.apache.org/blog/splittable-do-fn/
>>> [2] https://s.apache.org/splittable-do-fn
>>> [3] My prototype PR https://github.com/apache/beam/pull/11749
>>>
>>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Reuven,

I'm going to use MonotonicallyIncreasing
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105>
by
default and in the future, we may want to support custom kind if there is a
request.

On Thu, May 28, 2020 at 8:54 PM Reuven Lax <re...@google.com> wrote:

> Which WatermarkEstimator do you think should be used?
>
> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Hi team,
>>
>> I'm Boyuan, currently working on building a Kafka read PTransform on top
>> of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>> want to discuss with you:
>>
>> 1.  Compared to the KafkaIO.Read
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>> the SplittableDoFn Kafka version allows taking TopicPartition and
>> startReadTime as elements and processing them during execution time,
>> instead of configuring topics at pipeline construction time. I'm wondering
>> whether there are other configurations we also want to populate during
>> pipeline execution time instead of construction time. Taking these
>> configurations as elements would make value when they could be different
>> for different TopicPartition. For a list of configurations we have now,
>> please refer to KafkaIO.Read
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
>> .
>>
>> 2. I also want to offer a simple way for KafkaIO.Read to expand with the
>> SDF version PTransform. Almost all configurations can be translated easily
>> from KafkaIO.Read to the SDF version read except custom
>> TimestampPolicyFactory (It's easy to translate build-in default types such
>> as withProcessingTime
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>> withCreateTime
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>> and withLogAppendTime
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>> With SplittableDoFn, we have WatermarkEstimator
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>> to track watermark per TopicPartition. Thus, instead of
>> TimestampPolicyFactory
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> ,
>> we need the user to provide a function which can extract output timestamp
>> from a KafkaRecord(like withTimestampFn
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>> My question here is, are the default types enough for current Kafka.Read
>> users? If the custom TimestampPolicy is really in common? Is it okay to use
>> current API withTimestampFn
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> in
>> KafkaIO.Read to accept the custom function and populate it to the SDF read
>> transform?
>>
>> Thanks for your help!
>>
>> [1] https://beam.apache.org/blog/splittable-do-fn/
>> [2] https://s.apache.org/splittable-do-fn
>> [3] My prototype PR https://github.com/apache/beam/pull/11749
>>
>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Reuven Lax <re...@google.com>.
Which WatermarkEstimator do you think should be used?

On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi team,
>
> I'm Boyuan, currently working on building a Kafka read PTransform on top
> of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
> want to discuss with you:
>
> 1.  Compared to the KafkaIO.Read
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
> the SplittableDoFn Kafka version allows taking TopicPartition and
> startReadTime as elements and processing them during execution time,
> instead of configuring topics at pipeline construction time. I'm wondering
> whether there are other configurations we also want to populate during
> pipeline execution time instead of construction time. Taking these
> configurations as elements would make value when they could be different
> for different TopicPartition. For a list of configurations we have now,
> please refer to KafkaIO.Read
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
> .
>
> 2. I also want to offer a simple way for KafkaIO.Read to expand with the
> SDF version PTransform. Almost all configurations can be translated easily
> from KafkaIO.Read to the SDF version read except custom
> TimestampPolicyFactory (It's easy to translate build-in default types such
> as withProcessingTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
> withCreateTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
> and withLogAppendTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
> With SplittableDoFn, we have WatermarkEstimator
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
> to track watermark per TopicPartition. Thus, instead of
> TimestampPolicyFactory
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> ,
> we need the user to provide a function which can extract output timestamp
> from a KafkaRecord(like withTimestampFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
> My question here is, are the default types enough for current Kafka.Read
> users? If the custom TimestampPolicy is really in common? Is it okay to use
> current API withTimestampFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> in
> KafkaIO.Read to accept the custom function and populate it to the SDF read
> transform?
>
> Thanks for your help!
>
> [1] https://beam.apache.org/blog/splittable-do-fn/
> [2] https://s.apache.org/splittable-do-fn
> [3] My prototype PR https://github.com/apache/beam/pull/11749
>

Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

Posted by Alexey Romanenko <ar...@gmail.com>.
Great news, thank you for working on this! 

1. One of the demanded feature is to discover (and remove probably) new partitions and topics in runtime [1].


Do you expect any non-compatible API changes by adding SDF Read version or it’s going to be just another user API that can be used in parallel with old one?


[1] https://issues.apache.org/jira/browse/BEAM-5786


> On 29 May 2020, at 04:16, Boyuan Zhang <bo...@google.com> wrote:
> 
> Hi team,
> 
> I'm Boyuan, currently working on building a Kafka read PTransform on top of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want to discuss with you:
> 
> 1.  Compared to the KafkaIO.Read <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>, the SplittableDoFn Kafka version allows taking TopicPartition and startReadTime as elements and processing them during execution time, instead of configuring topics at pipeline construction time. I'm wondering whether there are other configurations we also want to populate during pipeline execution time instead of construction time. Taking these configurations as elements would make value when they could be different for different TopicPartition. For a list of configurations we have now, please refer to KafkaIO.Read <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>. 
> 
> 2. I also want to offer a simple way for KafkaIO.Read to expand with the SDF version PTransform. Almost all configurations can be translated easily from KafkaIO.Read to the SDF version read except custom TimestampPolicyFactory (It's easy to translate build-in default types such as withProcessingTime <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>, withCreateTime <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726> and withLogAppendTime <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.). With SplittableDoFn, we have WatermarkEstimator <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java> to track watermark per TopicPartition. Thus, instead of TimestampPolicyFactory <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> , we need the user to provide a function which can extract output timestamp from a KafkaRecord(like withTimestampFn <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>). My question here is, are the default types enough for current Kafka.Read users? If the custom TimestampPolicy is really in common? Is it okay to use current API withTimestampFn <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> in KafkaIO.Read to accept the custom function and populate it to the SDF read transform?
> 
> Thanks for your help!
> 
> [1] https://beam.apache.org/blog/splittable-do-fn/ <https://beam.apache.org/blog/splittable-do-fn/>
> [2] https://s.apache.org/splittable-do-fn <https://s.apache.org/splittable-do-fn>
> [3] My prototype PR https://github.com/apache/beam/pull/11749 <https://github.com/apache/beam/pull/11749>