You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Chamikara Jayalath <ch...@google.com> on 2018/04/28 05:06:26 UTC

Kafka connector for Beam Python SDK

Hi All,

I'm looking into adding a Kafka connector to Beam Python SDK. I think this
will benefits many Python SDK users and will serve as a good example for
recently added Splittable DoFn API (Fn API support which will allow all
runners to use Python Splittable DoFn is in active development).  I created
a document [1] that makes the case for adding this connector and compares
the performance of available Python Kafka client libraries. Also I created
a POC [2] that illustrates the API and how Python SDF API can be used to
implement a Kafka source. I extremely appreciate any feedback related to
this.

[1]
https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
[2]
https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9

Thanks,
Cham

Re: Kafka connector for Beam Python SDK

Posted by Raghu Angadi <ra...@google.com>.
On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <ch...@google.com>
wrote:

> Hi Aljoscha,
>
> I tried to cover this in the doc. Once we have full support for
> cross-language IO, we can decide this on a case-by-case basis. But I don't
> think we should cease defining new sources/sinks for Beam Python SDK till
> we get to that point. I think there are good reasons for adding Kafka
> support for Python today and many Beam users have request this. Also, note
> that proposed Python Kafka source will be based on the Splittable DoFn
> framework while the current Java version is based on the UnboundedSource
> framework. Here are the reasons that are currently listed in the doc.
>
>
>    -
>
>    Users might find it useful to have at least one unbounded source and
>    sink combination implemented in Python SDK and Kafka is the streaming
>    system that makes most sense to support if we just want to add support for
>    only one such system in Python SDK.
>    -
>
>    Not all runners might support cross-language IO. Also some
>    user/runner/deployment combinations might require an unbounded source/sink
>    implemented in Python SDK.
>    -
>
>    We recently added Splittable DoFn support to Python SDK. It will be
>    good to have at least one production quality Splittable DoFn that will
>    server as a good example for any users who wish to implement new Splittable
>    DoFn implementations on top of Beam Python SDK.
>
>
+1


>
>    -
>
>    Cross-language transform feature is currently is in the initial
>    discussion phase and it could be some time before we can offer existing
>    Java implementation of Kafka for Python SDK users.
>    -
>
>    Cross-language IO might take even longer to reach the point where it's
>    fully equivalent in expressive power to a transform written in the host
>    language - e.g. supporting host-language lambdas as part of the transform
>    configuration is likely to take a lot longer than "first-order"
>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>    configuration, e.g. timestamp functions.
>
>
FYI, note that old withTimestampFn() methods are deprecated and will be
removed soon.  Built in 'withLogAppendTime()', 'withCreateTime()' etc cover
most use cases and the replacement for custom timestamps
'withTimestampPolicyFactory()' takes a factory class.

 Raghu.


> Thanks,
> Cham
>
> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Is this what we want to do in the long run, i.e. implement copies of
>> connectors for different SDKs? I thought the plan was to enable using
>> connectors written in different languages, i.e. use the Java Kafka I/O from
>> python. This way we wouldn't duplicate bugs for three different language
>> (Java, Python, and Go for now).
>>
>> Best,
>> Aljoscha
>>
>>
>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com> wrote:
>>
>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>
>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>> this will benefits many Python SDK users and will serve as a good example
>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>> runners to use Python Splittable DoFn is in active development).  I created
>>> a document [1] that makes the case for adding this connector and compares
>>> the performance of available Python Kafka client libraries. Also I created
>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>> implement a Kafka source. I extremely appreciate any feedback related to
>>> this.
>>>
>>> [1]
>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>> [2]
>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>
>>> Thanks,
>>> Cham
>>>
>>
>>

Re: Kafka connector for Beam Python SDK

Posted by Henning Rohde <he...@google.com>.
Although I suspect/hope that sharing IO connectors across SDKs will
adequately cover the lion's share of implementations (especially the long
tail), I also think it's a case-by-case decision to make. Native IO might
be preferable for some uses and each SDK will want IO implementations where
they shine or at least for reference. I think of these options as
complementary.

For cross-language IO connectors that uses user functions in an intimate
way, to Reuven's point, the IO connector will have to be implemented in a
way that makes each user function a transform so that it can be supplied
and executed in the user's SDK. The current practice of embedding user
functions in DoFns won't work. This will require more fusion breaks (and
coding of data) than otherwise needed and could be a performance penalty,
unless the IO connector can be written in a way that avoids the user
function as Kenn suggests.

Small +1 to Kenn's idea of auditing the existing IO connectors to get a
sense of which IO might be problematic. However, it might be a tad
premature to do too much until the cross-language transform feature is
fleshed out further.

Henning


On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:

> I agree with Cham's motivations as far as "we need it now" and getting
> Python SDF up and running and exercised on a real connector.
>
> But I do find the current API of BigQueryIO to be a poor example. That
> particular functionality on BigQueryIO seems extraneous and goes against
> our own style guide [1]. The recommended way to write it would be for
> BigQueryIO to output a natural concrete type (like TableRow) and allow the
> following step to do conversions. This is a broader programming best
> practice - unless there is a compelling reason, you should just return the
> value rather than accept a higher-order function to operate on the value.
> Is there a compelling reason in this case? I just dug through the code and
> just see that it bottoms out in AvroSource where it does not seem to add
> functionality.
>
> Considering cross-language pipelines as a primary use case for all
> connectors, perhaps we should audit them and bring them into alignment now,
> deprecating paths using higher-order functions. We can still consider
> host-language convenience composites.
>
> For an unbounded source like KafkaIO the compelling reason is the
> timestamp extracting function to be able to maintain a watermark. Notably,
> PubsubIO does not accept such a function, but requires the timestamp to be
> in a metadata field that any language can describe (versus having to parse
> the message to pull out the timestamp).
>
> Kenn
>
> [1]
> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>
> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>
>> Another point: cross-language IOs might add a performance penalty in many
>> cases. For an example of this look at BigQueryIO. The user can register a
>> SerializableFunction that is evaluated on every record, and determines
>> which destination to write the record to. Now a Python user would want to
>> register a Python function for this of course. this means that the Java IO
>> would have to invoke Python code for each record it sees, which will likely
>> be a big performance hit.
>>
>> Of course the downside of duplicating IOs is exactly as you say -
>> multiple versions to maintain, and potentially duplicate bugs. I think the
>> right answer will need to be on a case-by-case basis.
>>
>> Reuven
>>
>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> I tried to cover this in the doc. Once we have full support for
>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>> we get to that point. I think there are good reasons for adding Kafka
>>> support for Python today and many Beam users have request this. Also, note
>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>> framework while the current Java version is based on the UnboundedSource
>>> framework. Here are the reasons that are currently listed in the doc.
>>>
>>>
>>>    -
>>>
>>>    Users might find it useful to have at least one unbounded source and
>>>    sink combination implemented in Python SDK and Kafka is the streaming
>>>    system that makes most sense to support if we just want to add support for
>>>    only one such system in Python SDK.
>>>    -
>>>
>>>    Not all runners might support cross-language IO. Also some
>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>    implemented in Python SDK.
>>>    -
>>>
>>>    We recently added Splittable DoFn support to Python SDK. It will be
>>>    good to have at least one production quality Splittable DoFn that
>>>    will server as a good example for any users who wish to implement new
>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>    -
>>>
>>>    Cross-language transform feature is currently is in the initial
>>>    discussion phase and it could be some time before we can offer existing
>>>    Java implementation of Kafka for Python SDK users.
>>>    -
>>>
>>>    Cross-language IO might take even longer to reach the point where
>>>    it's fully equivalent in expressive power to a transform written in the
>>>    host language - e.g. supporting host-language lambdas as part of the
>>>    transform configuration is likely to take a lot longer than "first-order"
>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>    configuration, e.g. timestamp functions.
>>>
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Is this what we want to do in the long run, i.e. implement copies of
>>>> connectors for different SDKs? I thought the plan was to enable using
>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>> python. This way we wouldn't duplicate bugs for three different language
>>>> (Java, Python, and Go for now).
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>>>
>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>>>> this will benefits many Python SDK users and will serve as a good example
>>>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>>>> runners to use Python Splittable DoFn is in active development).  I created
>>>>> a document [1] that makes the case for adding this connector and compares
>>>>> the performance of available Python Kafka client libraries. Also I created
>>>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>>>> implement a Kafka source. I extremely appreciate any feedback related to
>>>>> this.
>>>>>
>>>>> [1]
>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>> [2]
>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>
>>>>

Re: Kafka connector for Beam Python SDK

Posted by Chamikara Jayalath <ch...@google.com>.
Thanks all for the comments. Based on the discussion so far, looks like we
have to flesh out the cross-language transforms feature quite a bit before
we can utilize some of the existing Java IO in other SDKs. This might
involve redesigning some of the existing Java IOs to allow expressing
second order APIs in other languages without significantly affecting the
execution performance. Also, several people agreed that adding a SDF based
Kafka source to Python SDK will allow us to better iron out the SDF API.
I'll start prototyping the Kafka connector. Please follow
https://issues.apache.org/jira/browse/BEAM-3788 for details.

Thanks,
Cham

On Mon, Apr 30, 2018 at 4:08 PM Kenneth Knowles <kl...@google.com> wrote:

> The numbers on that PR are not really what end-to-end means to me - it
> normally means you have a fully represented productionized use case and the
> metric you are looking at is the actual impact on the full system (like
> latency from a tap on mobile to a dashboard being updated, or monthly
> compute cost for a system). Also FWIW when I say "TableRow" I don't mean
> the JSON wire format for them. I also believe that Luke's proposal has
> never been measured.
>
> But it is an obvious & fair point that encoding the table repeatedly will
> bloat compared to a second order transform. However in an end to end test
> you'd want to compare against ways of shuffling compact proxies and joining
> back to the full value, depending on the portability overhead. But, I
> agree, as you say, there's probably a decent design space of blending a
> portable first-order transform with little second-order helpers. Or perhaps
> with a shared memory layout we can have reasonable performance with a
> cross-language higher-order situation.
>
> Kenn
>
>
>
> On Mon, Apr 30, 2018 at 10:54 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> I think we've discussed this before... It is true that all of our
>> second-order APIs can be re-expressed as first-order APIs, but that would
>> come at a very serious performance cost - e.g. significant increase in
>> amount of data shuffled / materialized. The second-order APIs (most
>> importantly, Dynamic Destinations in BigQueryIO and FileIO.write) were
>> deliberately designed this way to minimize amount of data shuffled (grouped
>> by destination) and postpone the redundant representation (record,
>> destination) as late as possible in the graph.
>>
>> Experience from https://github.com/apache/beam/pull/3894 shows that
>> getting rid of TableRow in the representation alone gives 2x-3x end-to-end
>> performance gains. Gains from avoiding redundancy are likely still higher.
>> Some back-of-the-envelope calculations:
>> - Imagine we're writing purchases to BigQuery tables (store_id,
>> customer_id, product_id, quantity), one table per store, e.g.
>> "stores_dataset:purchases_$storeid".
>> - With the second-order representation, we group records (store_id,
>> customer_id, product_id, quantity) by store_id - both key and value are a
>> handful of bytes.
>> - With the first-order representation, we group records of the form
>> {table_name:"stores_dataset:purchases_$storeid", row: {"store_id":"...",
>> "customer_id":"...", "product_id":"...", "quantity":"..."}} which is many
>> times more in encoded form, both key and value. Schemas, perhaps, might
>> help with encoding the TableRow less redundantly, but a) I don't know if
>> that's actually the case b) the schemas work will also take more time to
>> land in non-Java SDKs c) this doesn't get rid of the redundancy in keys (at
>> a minimum, in a cross-language world, we'll definitely need to *encode* all
>> such keys redundantly)
>>
>> I.e., yes, first-order APIs are possible, and likely even desirable for
>> the first version of cross-language IO, but I do not believe that staying
>> first-order-only is an acceptable long-term state.
>>
>> Note also that second-order doesn't necessarily require ability to invoke
>> cross-language lambdas - there are other approaches, e.g. rewriting in the
>> SDK language only the sub-parts of the transforms that actually use the
>> lambdas.
>>
>> On Mon, Apr 30, 2018 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I believe that most (all?) of these cases of executing a lambda could be
>>> avoided if we passed along structured records like:
>>> {
>>>   table_name:
>>>   row: { ... }
>>> }
>>>
>>>
>>> On Mon, Apr 30, 2018 at 10:24 AM Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:
>>>>
>>>>> I agree with Cham's motivations as far as "we need it now" and getting
>>>>> Python SDF up and running and exercised on a real connector.
>>>>>
>>>>> But I do find the current API of BigQueryIO to be a poor example. That
>>>>> particular functionality on BigQueryIO seems extraneous and goes against
>>>>> our own style guide [1]. The recommended way to write it would be for
>>>>> BigQueryIO to output a natural concrete type (like TableRow) and allow the
>>>>> following step to do conversions. This is a broader programming best
>>>>> practice - unless there is a compelling reason, you should just return the
>>>>> value rather than accept a higher-order function to operate on the value.
>>>>> Is there a compelling reason in this case? I just dug through the code and
>>>>> just see that it bottoms out in AvroSource where it does not seem to add
>>>>> functionality.
>>>>>
>>>>
>>>> I think what Reuven was referring to was a functionality of the BQ sink
>>>> not the source. Even if we set the destination in an additional step, this
>>>> will still have to cross SDK harness boundary just to set the property
>>>> (invoking BQ destination lambda in this case), won't it? Java will be able
>>>> to fuse steps in this case but Python/Go will involve "some work in Java
>>>> SDK harness" + "invoking Python/Go harness to invoke the BQ destination
>>>> lambda" + "invoke Java harness to continue processing", including
>>>> serialization/deserialization in between. This could still result in a
>>>> significant amount of overhead for SDKs other than the one where the IO is
>>>> implemented.
>>>>
>>>>
>>>>>
>>>>> Considering cross-language pipelines as a primary use case for all
>>>>> connectors, perhaps we should audit them and bring them into alignment now,
>>>>> deprecating paths using higher-order functions. We can still consider
>>>>> host-language convenience composites.
>>>>>
>>>>> For an unbounded source like KafkaIO the compelling reason is the
>>>>> timestamp extracting function to be able to maintain a watermark. Notably,
>>>>> PubsubIO does not accept such a function, but requires the timestamp to be
>>>>> in a metadata field that any language can describe (versus having to parse
>>>>> the message to pull out the timestamp).
>>>>>
>>>>> Kenn
>>>>>
>>>>> [1]
>>>>> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>>>>>
>>>>> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Another point: cross-language IOs might add a performance penalty in
>>>>>> many cases. For an example of this look at BigQueryIO. The user can
>>>>>> register a SerializableFunction that is evaluated on every record, and
>>>>>> determines which destination to write the record to. Now a Python user
>>>>>> would want to register a Python function for this of course. this means
>>>>>> that the Java IO would have to invoke Python code for each record it sees,
>>>>>> which will likely be a big performance hit.
>>>>>>
>>>>>> Of course the downside of duplicating IOs is exactly as you say -
>>>>>> multiple versions to maintain, and potentially duplicate bugs. I think the
>>>>>> right answer will need to be on a case-by-case basis.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> I tried to cover this in the doc. Once we have full support for
>>>>>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>>>>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>>>>>> we get to that point. I think there are good reasons for adding Kafka
>>>>>>> support for Python today and many Beam users have request this. Also, note
>>>>>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>>>>>> framework while the current Java version is based on the UnboundedSource
>>>>>>> framework. Here are the reasons that are currently listed in the doc.
>>>>>>>
>>>>>>>
>>>>>>>    -
>>>>>>>
>>>>>>>    Users might find it useful to have at least one unbounded source
>>>>>>>    and sink combination implemented in Python SDK and Kafka is the streaming
>>>>>>>    system that makes most sense to support if we just want to add support for
>>>>>>>    only one such system in Python SDK.
>>>>>>>    -
>>>>>>>
>>>>>>>    Not all runners might support cross-language IO. Also some
>>>>>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>>>>>    implemented in Python SDK.
>>>>>>>    -
>>>>>>>
>>>>>>>    We recently added Splittable DoFn support to Python SDK. It will
>>>>>>>    be good to have at least one production quality Splittable DoFn
>>>>>>>    that will server as a good example for any users who wish to implement new
>>>>>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>>>>>    -
>>>>>>>
>>>>>>>    Cross-language transform feature is currently is in the initial
>>>>>>>    discussion phase and it could be some time before we can offer existing
>>>>>>>    Java implementation of Kafka for Python SDK users.
>>>>>>>    -
>>>>>>>
>>>>>>>    Cross-language IO might take even longer to reach the point
>>>>>>>    where it's fully equivalent in expressive power to a transform written in
>>>>>>>    the host language - e.g. supporting host-language lambdas as part of the
>>>>>>>    transform configuration is likely to take a lot longer than "first-order"
>>>>>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>>>>>    configuration, e.g. timestamp functions.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <
>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>
>>>>>>>> Is this what we want to do in the long run, i.e. implement copies
>>>>>>>> of connectors for different SDKs? I thought the plan was to enable using
>>>>>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>>>>>> python. This way we wouldn't duplicate bugs for three different language
>>>>>>>> (Java, Python, and Go for now).
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>>
>>>>>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks Cham, this is great! I left just a couple of comments on the
>>>>>>>> doc.
>>>>>>>>
>>>>>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>>>>>> chamikara@google.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I
>>>>>>>>> think this will benefits many Python SDK users and will serve as a good
>>>>>>>>> example for recently added Splittable DoFn API (Fn API support which will
>>>>>>>>> allow all runners to use Python Splittable DoFn is in active development).
>>>>>>>>> I created a document [1] that makes the case for adding this connector and
>>>>>>>>> compares the performance of available Python Kafka client libraries. Also I
>>>>>>>>> created a POC [2] that illustrates the API and how Python SDF API can be
>>>>>>>>> used to implement a Kafka source. I extremely appreciate any feedback
>>>>>>>>> related to this.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>>>>>> [2]
>>>>>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Kafka connector for Beam Python SDK

Posted by Kenneth Knowles <kl...@google.com>.
The numbers on that PR are not really what end-to-end means to me - it
normally means you have a fully represented productionized use case and the
metric you are looking at is the actual impact on the full system (like
latency from a tap on mobile to a dashboard being updated, or monthly
compute cost for a system). Also FWIW when I say "TableRow" I don't mean
the JSON wire format for them. I also believe that Luke's proposal has
never been measured.

But it is an obvious & fair point that encoding the table repeatedly will
bloat compared to a second order transform. However in an end to end test
you'd want to compare against ways of shuffling compact proxies and joining
back to the full value, depending on the portability overhead. But, I
agree, as you say, there's probably a decent design space of blending a
portable first-order transform with little second-order helpers. Or perhaps
with a shared memory layout we can have reasonable performance with a
cross-language higher-order situation.

Kenn



On Mon, Apr 30, 2018 at 10:54 AM Eugene Kirpichov <ki...@google.com>
wrote:

> I think we've discussed this before... It is true that all of our
> second-order APIs can be re-expressed as first-order APIs, but that would
> come at a very serious performance cost - e.g. significant increase in
> amount of data shuffled / materialized. The second-order APIs (most
> importantly, Dynamic Destinations in BigQueryIO and FileIO.write) were
> deliberately designed this way to minimize amount of data shuffled (grouped
> by destination) and postpone the redundant representation (record,
> destination) as late as possible in the graph.
>
> Experience from https://github.com/apache/beam/pull/3894 shows that
> getting rid of TableRow in the representation alone gives 2x-3x end-to-end
> performance gains. Gains from avoiding redundancy are likely still higher.
> Some back-of-the-envelope calculations:
> - Imagine we're writing purchases to BigQuery tables (store_id,
> customer_id, product_id, quantity), one table per store, e.g.
> "stores_dataset:purchases_$storeid".
> - With the second-order representation, we group records (store_id,
> customer_id, product_id, quantity) by store_id - both key and value are a
> handful of bytes.
> - With the first-order representation, we group records of the form
> {table_name:"stores_dataset:purchases_$storeid", row: {"store_id":"...",
> "customer_id":"...", "product_id":"...", "quantity":"..."}} which is many
> times more in encoded form, both key and value. Schemas, perhaps, might
> help with encoding the TableRow less redundantly, but a) I don't know if
> that's actually the case b) the schemas work will also take more time to
> land in non-Java SDKs c) this doesn't get rid of the redundancy in keys (at
> a minimum, in a cross-language world, we'll definitely need to *encode* all
> such keys redundantly)
>
> I.e., yes, first-order APIs are possible, and likely even desirable for
> the first version of cross-language IO, but I do not believe that staying
> first-order-only is an acceptable long-term state.
>
> Note also that second-order doesn't necessarily require ability to invoke
> cross-language lambdas - there are other approaches, e.g. rewriting in the
> SDK language only the sub-parts of the transforms that actually use the
> lambdas.
>
> On Mon, Apr 30, 2018 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> I believe that most (all?) of these cases of executing a lambda could be
>> avoided if we passed along structured records like:
>> {
>>   table_name:
>>   row: { ... }
>> }
>>
>>
>> On Mon, Apr 30, 2018 at 10:24 AM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> I agree with Cham's motivations as far as "we need it now" and getting
>>>> Python SDF up and running and exercised on a real connector.
>>>>
>>>> But I do find the current API of BigQueryIO to be a poor example. That
>>>> particular functionality on BigQueryIO seems extraneous and goes against
>>>> our own style guide [1]. The recommended way to write it would be for
>>>> BigQueryIO to output a natural concrete type (like TableRow) and allow the
>>>> following step to do conversions. This is a broader programming best
>>>> practice - unless there is a compelling reason, you should just return the
>>>> value rather than accept a higher-order function to operate on the value.
>>>> Is there a compelling reason in this case? I just dug through the code and
>>>> just see that it bottoms out in AvroSource where it does not seem to add
>>>> functionality.
>>>>
>>>
>>> I think what Reuven was referring to was a functionality of the BQ sink
>>> not the source. Even if we set the destination in an additional step, this
>>> will still have to cross SDK harness boundary just to set the property
>>> (invoking BQ destination lambda in this case), won't it? Java will be able
>>> to fuse steps in this case but Python/Go will involve "some work in Java
>>> SDK harness" + "invoking Python/Go harness to invoke the BQ destination
>>> lambda" + "invoke Java harness to continue processing", including
>>> serialization/deserialization in between. This could still result in a
>>> significant amount of overhead for SDKs other than the one where the IO is
>>> implemented.
>>>
>>>
>>>>
>>>> Considering cross-language pipelines as a primary use case for all
>>>> connectors, perhaps we should audit them and bring them into alignment now,
>>>> deprecating paths using higher-order functions. We can still consider
>>>> host-language convenience composites.
>>>>
>>>> For an unbounded source like KafkaIO the compelling reason is the
>>>> timestamp extracting function to be able to maintain a watermark. Notably,
>>>> PubsubIO does not accept such a function, but requires the timestamp to be
>>>> in a metadata field that any language can describe (versus having to parse
>>>> the message to pull out the timestamp).
>>>>
>>>> Kenn
>>>>
>>>> [1]
>>>> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>>>>
>>>> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Another point: cross-language IOs might add a performance penalty in
>>>>> many cases. For an example of this look at BigQueryIO. The user can
>>>>> register a SerializableFunction that is evaluated on every record, and
>>>>> determines which destination to write the record to. Now a Python user
>>>>> would want to register a Python function for this of course. this means
>>>>> that the Java IO would have to invoke Python code for each record it sees,
>>>>> which will likely be a big performance hit.
>>>>>
>>>>> Of course the downside of duplicating IOs is exactly as you say -
>>>>> multiple versions to maintain, and potentially duplicate bugs. I think the
>>>>> right answer will need to be on a case-by-case basis.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> I tried to cover this in the doc. Once we have full support for
>>>>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>>>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>>>>> we get to that point. I think there are good reasons for adding Kafka
>>>>>> support for Python today and many Beam users have request this. Also, note
>>>>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>>>>> framework while the current Java version is based on the UnboundedSource
>>>>>> framework. Here are the reasons that are currently listed in the doc.
>>>>>>
>>>>>>
>>>>>>    -
>>>>>>
>>>>>>    Users might find it useful to have at least one unbounded source
>>>>>>    and sink combination implemented in Python SDK and Kafka is the streaming
>>>>>>    system that makes most sense to support if we just want to add support for
>>>>>>    only one such system in Python SDK.
>>>>>>    -
>>>>>>
>>>>>>    Not all runners might support cross-language IO. Also some
>>>>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>>>>    implemented in Python SDK.
>>>>>>    -
>>>>>>
>>>>>>    We recently added Splittable DoFn support to Python SDK. It will
>>>>>>    be good to have at least one production quality Splittable DoFn
>>>>>>    that will server as a good example for any users who wish to implement new
>>>>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>>>>    -
>>>>>>
>>>>>>    Cross-language transform feature is currently is in the initial
>>>>>>    discussion phase and it could be some time before we can offer existing
>>>>>>    Java implementation of Kafka for Python SDK users.
>>>>>>    -
>>>>>>
>>>>>>    Cross-language IO might take even longer to reach the point where
>>>>>>    it's fully equivalent in expressive power to a transform written in the
>>>>>>    host language - e.g. supporting host-language lambdas as part of the
>>>>>>    transform configuration is likely to take a lot longer than "first-order"
>>>>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>>>>    configuration, e.g. timestamp functions.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Is this what we want to do in the long run, i.e. implement copies of
>>>>>>> connectors for different SDKs? I thought the plan was to enable using
>>>>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>>>>> python. This way we wouldn't duplicate bugs for three different language
>>>>>>> (Java, Python, and Go for now).
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Thanks Cham, this is great! I left just a couple of comments on the
>>>>>>> doc.
>>>>>>>
>>>>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>>>>> chamikara@google.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I
>>>>>>>> think this will benefits many Python SDK users and will serve as a good
>>>>>>>> example for recently added Splittable DoFn API (Fn API support which will
>>>>>>>> allow all runners to use Python Splittable DoFn is in active development).
>>>>>>>> I created a document [1] that makes the case for adding this connector and
>>>>>>>> compares the performance of available Python Kafka client libraries. Also I
>>>>>>>> created a POC [2] that illustrates the API and how Python SDF API can be
>>>>>>>> used to implement a Kafka source. I extremely appreciate any feedback
>>>>>>>> related to this.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>>>>> [2]
>>>>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>
>>>>>>>

Re: Kafka connector for Beam Python SDK

Posted by Eugene Kirpichov <ki...@google.com>.
I think we've discussed this before... It is true that all of our
second-order APIs can be re-expressed as first-order APIs, but that would
come at a very serious performance cost - e.g. significant increase in
amount of data shuffled / materialized. The second-order APIs (most
importantly, Dynamic Destinations in BigQueryIO and FileIO.write) were
deliberately designed this way to minimize amount of data shuffled (grouped
by destination) and postpone the redundant representation (record,
destination) as late as possible in the graph.

Experience from https://github.com/apache/beam/pull/3894 shows that getting
rid of TableRow in the representation alone gives 2x-3x end-to-end
performance gains. Gains from avoiding redundancy are likely still higher.
Some back-of-the-envelope calculations:
- Imagine we're writing purchases to BigQuery tables (store_id,
customer_id, product_id, quantity), one table per store, e.g.
"stores_dataset:purchases_$storeid".
- With the second-order representation, we group records (store_id,
customer_id, product_id, quantity) by store_id - both key and value are a
handful of bytes.
- With the first-order representation, we group records of the form
{table_name:"stores_dataset:purchases_$storeid", row: {"store_id":"...",
"customer_id":"...", "product_id":"...", "quantity":"..."}} which is many
times more in encoded form, both key and value. Schemas, perhaps, might
help with encoding the TableRow less redundantly, but a) I don't know if
that's actually the case b) the schemas work will also take more time to
land in non-Java SDKs c) this doesn't get rid of the redundancy in keys (at
a minimum, in a cross-language world, we'll definitely need to *encode* all
such keys redundantly)

I.e., yes, first-order APIs are possible, and likely even desirable for the
first version of cross-language IO, but I do not believe that staying
first-order-only is an acceptable long-term state.

Note also that second-order doesn't necessarily require ability to invoke
cross-language lambdas - there are other approaches, e.g. rewriting in the
SDK language only the sub-parts of the transforms that actually use the
lambdas.

On Mon, Apr 30, 2018 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:

> I believe that most (all?) of these cases of executing a lambda could be
> avoided if we passed along structured records like:
> {
>   table_name:
>   row: { ... }
> }
>
>
> On Mon, Apr 30, 2018 at 10:24 AM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> I agree with Cham's motivations as far as "we need it now" and getting
>>> Python SDF up and running and exercised on a real connector.
>>>
>>> But I do find the current API of BigQueryIO to be a poor example. That
>>> particular functionality on BigQueryIO seems extraneous and goes against
>>> our own style guide [1]. The recommended way to write it would be for
>>> BigQueryIO to output a natural concrete type (like TableRow) and allow the
>>> following step to do conversions. This is a broader programming best
>>> practice - unless there is a compelling reason, you should just return the
>>> value rather than accept a higher-order function to operate on the value.
>>> Is there a compelling reason in this case? I just dug through the code and
>>> just see that it bottoms out in AvroSource where it does not seem to add
>>> functionality.
>>>
>>
>> I think what Reuven was referring to was a functionality of the BQ sink
>> not the source. Even if we set the destination in an additional step, this
>> will still have to cross SDK harness boundary just to set the property
>> (invoking BQ destination lambda in this case), won't it? Java will be able
>> to fuse steps in this case but Python/Go will involve "some work in Java
>> SDK harness" + "invoking Python/Go harness to invoke the BQ destination
>> lambda" + "invoke Java harness to continue processing", including
>> serialization/deserialization in between. This could still result in a
>> significant amount of overhead for SDKs other than the one where the IO is
>> implemented.
>>
>>
>>>
>>> Considering cross-language pipelines as a primary use case for all
>>> connectors, perhaps we should audit them and bring them into alignment now,
>>> deprecating paths using higher-order functions. We can still consider
>>> host-language convenience composites.
>>>
>>> For an unbounded source like KafkaIO the compelling reason is the
>>> timestamp extracting function to be able to maintain a watermark. Notably,
>>> PubsubIO does not accept such a function, but requires the timestamp to be
>>> in a metadata field that any language can describe (versus having to parse
>>> the message to pull out the timestamp).
>>>
>>> Kenn
>>>
>>> [1]
>>> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>>>
>>> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Another point: cross-language IOs might add a performance penalty in
>>>> many cases. For an example of this look at BigQueryIO. The user can
>>>> register a SerializableFunction that is evaluated on every record, and
>>>> determines which destination to write the record to. Now a Python user
>>>> would want to register a Python function for this of course. this means
>>>> that the Java IO would have to invoke Python code for each record it sees,
>>>> which will likely be a big performance hit.
>>>>
>>>> Of course the downside of duplicating IOs is exactly as you say -
>>>> multiple versions to maintain, and potentially duplicate bugs. I think the
>>>> right answer will need to be on a case-by-case basis.
>>>>
>>>> Reuven
>>>>
>>>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> I tried to cover this in the doc. Once we have full support for
>>>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>>>> we get to that point. I think there are good reasons for adding Kafka
>>>>> support for Python today and many Beam users have request this. Also, note
>>>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>>>> framework while the current Java version is based on the UnboundedSource
>>>>> framework. Here are the reasons that are currently listed in the doc.
>>>>>
>>>>>
>>>>>    -
>>>>>
>>>>>    Users might find it useful to have at least one unbounded source
>>>>>    and sink combination implemented in Python SDK and Kafka is the streaming
>>>>>    system that makes most sense to support if we just want to add support for
>>>>>    only one such system in Python SDK.
>>>>>    -
>>>>>
>>>>>    Not all runners might support cross-language IO. Also some
>>>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>>>    implemented in Python SDK.
>>>>>    -
>>>>>
>>>>>    We recently added Splittable DoFn support to Python SDK. It will
>>>>>    be good to have at least one production quality Splittable DoFn
>>>>>    that will server as a good example for any users who wish to implement new
>>>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>>>    -
>>>>>
>>>>>    Cross-language transform feature is currently is in the initial
>>>>>    discussion phase and it could be some time before we can offer existing
>>>>>    Java implementation of Kafka for Python SDK users.
>>>>>    -
>>>>>
>>>>>    Cross-language IO might take even longer to reach the point where
>>>>>    it's fully equivalent in expressive power to a transform written in the
>>>>>    host language - e.g. supporting host-language lambdas as part of the
>>>>>    transform configuration is likely to take a lot longer than "first-order"
>>>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>>>    configuration, e.g. timestamp functions.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Is this what we want to do in the long run, i.e. implement copies of
>>>>>> connectors for different SDKs? I thought the plan was to enable using
>>>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>>>> python. This way we wouldn't duplicate bugs for three different language
>>>>>> (Java, Python, and Go for now).
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks Cham, this is great! I left just a couple of comments on the
>>>>>> doc.
>>>>>>
>>>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I
>>>>>>> think this will benefits many Python SDK users and will serve as a good
>>>>>>> example for recently added Splittable DoFn API (Fn API support which will
>>>>>>> allow all runners to use Python Splittable DoFn is in active development).
>>>>>>> I created a document [1] that makes the case for adding this connector and
>>>>>>> compares the performance of available Python Kafka client libraries. Also I
>>>>>>> created a POC [2] that illustrates the API and how Python SDF API can be
>>>>>>> used to implement a Kafka source. I extremely appreciate any feedback
>>>>>>> related to this.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>>>> [2]
>>>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>
>>>>>>

Re: Kafka connector for Beam Python SDK

Posted by Lukasz Cwik <lc...@google.com>.
I believe that most (all?) of these cases of executing a lambda could be
avoided if we passed along structured records like:
{
  table_name:
  row: { ... }
}


On Mon, Apr 30, 2018 at 10:24 AM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:
>
>> I agree with Cham's motivations as far as "we need it now" and getting
>> Python SDF up and running and exercised on a real connector.
>>
>> But I do find the current API of BigQueryIO to be a poor example. That
>> particular functionality on BigQueryIO seems extraneous and goes against
>> our own style guide [1]. The recommended way to write it would be for
>> BigQueryIO to output a natural concrete type (like TableRow) and allow the
>> following step to do conversions. This is a broader programming best
>> practice - unless there is a compelling reason, you should just return the
>> value rather than accept a higher-order function to operate on the value.
>> Is there a compelling reason in this case? I just dug through the code and
>> just see that it bottoms out in AvroSource where it does not seem to add
>> functionality.
>>
>
> I think what Reuven was referring to was a functionality of the BQ sink
> not the source. Even if we set the destination in an additional step, this
> will still have to cross SDK harness boundary just to set the property
> (invoking BQ destination lambda in this case), won't it? Java will be able
> to fuse steps in this case but Python/Go will involve "some work in Java
> SDK harness" + "invoking Python/Go harness to invoke the BQ destination
> lambda" + "invoke Java harness to continue processing", including
> serialization/deserialization in between. This could still result in a
> significant amount of overhead for SDKs other than the one where the IO is
> implemented.
>
>
>>
>> Considering cross-language pipelines as a primary use case for all
>> connectors, perhaps we should audit them and bring them into alignment now,
>> deprecating paths using higher-order functions. We can still consider
>> host-language convenience composites.
>>
>> For an unbounded source like KafkaIO the compelling reason is the
>> timestamp extracting function to be able to maintain a watermark. Notably,
>> PubsubIO does not accept such a function, but requires the timestamp to be
>> in a metadata field that any language can describe (versus having to parse
>> the message to pull out the timestamp).
>>
>> Kenn
>>
>> [1]
>> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>>
>> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Another point: cross-language IOs might add a performance penalty in
>>> many cases. For an example of this look at BigQueryIO. The user can
>>> register a SerializableFunction that is evaluated on every record, and
>>> determines which destination to write the record to. Now a Python user
>>> would want to register a Python function for this of course. this means
>>> that the Java IO would have to invoke Python code for each record it sees,
>>> which will likely be a big performance hit.
>>>
>>> Of course the downside of duplicating IOs is exactly as you say -
>>> multiple versions to maintain, and potentially duplicate bugs. I think the
>>> right answer will need to be on a case-by-case basis.
>>>
>>> Reuven
>>>
>>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> Hi Aljoscha,
>>>>
>>>> I tried to cover this in the doc. Once we have full support for
>>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>>> we get to that point. I think there are good reasons for adding Kafka
>>>> support for Python today and many Beam users have request this. Also, note
>>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>>> framework while the current Java version is based on the UnboundedSource
>>>> framework. Here are the reasons that are currently listed in the doc.
>>>>
>>>>
>>>>    -
>>>>
>>>>    Users might find it useful to have at least one unbounded source
>>>>    and sink combination implemented in Python SDK and Kafka is the streaming
>>>>    system that makes most sense to support if we just want to add support for
>>>>    only one such system in Python SDK.
>>>>    -
>>>>
>>>>    Not all runners might support cross-language IO. Also some
>>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>>    implemented in Python SDK.
>>>>    -
>>>>
>>>>    We recently added Splittable DoFn support to Python SDK. It will be
>>>>    good to have at least one production quality Splittable DoFn that
>>>>    will server as a good example for any users who wish to implement new
>>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>>    -
>>>>
>>>>    Cross-language transform feature is currently is in the initial
>>>>    discussion phase and it could be some time before we can offer existing
>>>>    Java implementation of Kafka for Python SDK users.
>>>>    -
>>>>
>>>>    Cross-language IO might take even longer to reach the point where
>>>>    it's fully equivalent in expressive power to a transform written in the
>>>>    host language - e.g. supporting host-language lambdas as part of the
>>>>    transform configuration is likely to take a lot longer than "first-order"
>>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>>    configuration, e.g. timestamp functions.
>>>>
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Is this what we want to do in the long run, i.e. implement copies of
>>>>> connectors for different SDKs? I thought the plan was to enable using
>>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>>> python. This way we wouldn't duplicate bugs for three different language
>>>>> (Java, Python, and Go for now).
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>> Thanks Cham, this is great! I left just a couple of comments on the
>>>>> doc.
>>>>>
>>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>>>>> this will benefits many Python SDK users and will serve as a good example
>>>>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>>>>> runners to use Python Splittable DoFn is in active development).  I created
>>>>>> a document [1] that makes the case for adding this connector and compares
>>>>>> the performance of available Python Kafka client libraries. Also I created
>>>>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>>>>> implement a Kafka source. I extremely appreciate any feedback related to
>>>>>> this.
>>>>>>
>>>>>> [1]
>>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>>> [2]
>>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>
>>>>>

Re: Kafka connector for Beam Python SDK

Posted by Chamikara Jayalath <ch...@google.com>.
On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:

> I agree with Cham's motivations as far as "we need it now" and getting
> Python SDF up and running and exercised on a real connector.
>
> But I do find the current API of BigQueryIO to be a poor example. That
> particular functionality on BigQueryIO seems extraneous and goes against
> our own style guide [1]. The recommended way to write it would be for
> BigQueryIO to output a natural concrete type (like TableRow) and allow the
> following step to do conversions. This is a broader programming best
> practice - unless there is a compelling reason, you should just return the
> value rather than accept a higher-order function to operate on the value.
> Is there a compelling reason in this case? I just dug through the code and
> just see that it bottoms out in AvroSource where it does not seem to add
> functionality.
>

I think what Reuven was referring to was a functionality of the BQ sink not
the source. Even if we set the destination in an additional step, this will
still have to cross SDK harness boundary just to set the property (invoking
BQ destination lambda in this case), won't it? Java will be able to fuse
steps in this case but Python/Go will involve "some work in Java SDK
harness" + "invoking Python/Go harness to invoke the BQ destination lambda"
+ "invoke Java harness to continue processing", including
serialization/deserialization in between. This could still result in a
significant amount of overhead for SDKs other than the one where the IO is
implemented.


>
> Considering cross-language pipelines as a primary use case for all
> connectors, perhaps we should audit them and bring them into alignment now,
> deprecating paths using higher-order functions. We can still consider
> host-language convenience composites.
>
> For an unbounded source like KafkaIO the compelling reason is the
> timestamp extracting function to be able to maintain a watermark. Notably,
> PubsubIO does not accept such a function, but requires the timestamp to be
> in a metadata field that any language can describe (versus having to parse
> the message to pull out the timestamp).
>
> Kenn
>
> [1]
> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>
> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>
>> Another point: cross-language IOs might add a performance penalty in many
>> cases. For an example of this look at BigQueryIO. The user can register a
>> SerializableFunction that is evaluated on every record, and determines
>> which destination to write the record to. Now a Python user would want to
>> register a Python function for this of course. this means that the Java IO
>> would have to invoke Python code for each record it sees, which will likely
>> be a big performance hit.
>>
>> Of course the downside of duplicating IOs is exactly as you say -
>> multiple versions to maintain, and potentially duplicate bugs. I think the
>> right answer will need to be on a case-by-case basis.
>>
>> Reuven
>>
>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> I tried to cover this in the doc. Once we have full support for
>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>> we get to that point. I think there are good reasons for adding Kafka
>>> support for Python today and many Beam users have request this. Also, note
>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>> framework while the current Java version is based on the UnboundedSource
>>> framework. Here are the reasons that are currently listed in the doc.
>>>
>>>
>>>    -
>>>
>>>    Users might find it useful to have at least one unbounded source and
>>>    sink combination implemented in Python SDK and Kafka is the streaming
>>>    system that makes most sense to support if we just want to add support for
>>>    only one such system in Python SDK.
>>>    -
>>>
>>>    Not all runners might support cross-language IO. Also some
>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>    implemented in Python SDK.
>>>    -
>>>
>>>    We recently added Splittable DoFn support to Python SDK. It will be
>>>    good to have at least one production quality Splittable DoFn that
>>>    will server as a good example for any users who wish to implement new
>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>    -
>>>
>>>    Cross-language transform feature is currently is in the initial
>>>    discussion phase and it could be some time before we can offer existing
>>>    Java implementation of Kafka for Python SDK users.
>>>    -
>>>
>>>    Cross-language IO might take even longer to reach the point where
>>>    it's fully equivalent in expressive power to a transform written in the
>>>    host language - e.g. supporting host-language lambdas as part of the
>>>    transform configuration is likely to take a lot longer than "first-order"
>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>    configuration, e.g. timestamp functions.
>>>
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Is this what we want to do in the long run, i.e. implement copies of
>>>> connectors for different SDKs? I thought the plan was to enable using
>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>> python. This way we wouldn't duplicate bugs for three different language
>>>> (Java, Python, and Go for now).
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>>>
>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>>>> this will benefits many Python SDK users and will serve as a good example
>>>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>>>> runners to use Python Splittable DoFn is in active development).  I created
>>>>> a document [1] that makes the case for adding this connector and compares
>>>>> the performance of available Python Kafka client libraries. Also I created
>>>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>>>> implement a Kafka source. I extremely appreciate any feedback related to
>>>>> this.
>>>>>
>>>>> [1]
>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>> [2]
>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>
>>>>

Re: Kafka connector for Beam Python SDK

Posted by Reuven Lax <re...@google.com>.
On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:

> I agree with Cham's motivations as far as "we need it now" and getting
> Python SDF up and running and exercised on a real connector.
>
> But I do find the current API of BigQueryIO to be a poor example. That
> particular functionality on BigQueryIO seems extraneous and goes against
> our own style guide [1]. The recommended way to write it would be for
> BigQueryIO to output a natural concrete type (like TableRow) and allow the
> following step to do conversions.
>

There were several concrete reasons not to do it that way. One being the
fact that very often the information needed is in the original type, and no
longer exists in the concrete type being written (TableRow).


> This is a broader programming best practice - unless there is a compelling
> reason, you should just return the value rather than accept a higher-order
> function to operate on the value. Is there a compelling reason in this
> case? I just dug through the code and just see that it bottoms out in
> AvroSource where it does not seem to add functionality.
>
> Considering cross-language pipelines as a primary use case for all
> connectors, perhaps we should audit them and bring them into alignment now,
> deprecating paths using higher-order functions. We can still consider
> host-language convenience composites.
>
> For an unbounded source like KafkaIO the compelling reason is the
> timestamp extracting function to be able to maintain a watermark. Notably,
> PubsubIO does not accept such a function, but requires the timestamp to be
> in a metadata field that any language can describe (versus having to parse
> the message to pull out the timestamp).
>
> Kenn
>
> [1]
> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>
> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>
>> Another point: cross-language IOs might add a performance penalty in many
>> cases. For an example of this look at BigQueryIO. The user can register a
>> SerializableFunction that is evaluated on every record, and determines
>> which destination to write the record to. Now a Python user would want to
>> register a Python function for this of course. this means that the Java IO
>> would have to invoke Python code for each record it sees, which will likely
>> be a big performance hit.
>>
>> Of course the downside of duplicating IOs is exactly as you say -
>> multiple versions to maintain, and potentially duplicate bugs. I think the
>> right answer will need to be on a case-by-case basis.
>>
>> Reuven
>>
>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> I tried to cover this in the doc. Once we have full support for
>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>> we get to that point. I think there are good reasons for adding Kafka
>>> support for Python today and many Beam users have request this. Also, note
>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>> framework while the current Java version is based on the UnboundedSource
>>> framework. Here are the reasons that are currently listed in the doc.
>>>
>>>
>>>    -
>>>
>>>    Users might find it useful to have at least one unbounded source and
>>>    sink combination implemented in Python SDK and Kafka is the streaming
>>>    system that makes most sense to support if we just want to add support for
>>>    only one such system in Python SDK.
>>>    -
>>>
>>>    Not all runners might support cross-language IO. Also some
>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>    implemented in Python SDK.
>>>    -
>>>
>>>    We recently added Splittable DoFn support to Python SDK. It will be
>>>    good to have at least one production quality Splittable DoFn that
>>>    will server as a good example for any users who wish to implement new
>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>    -
>>>
>>>    Cross-language transform feature is currently is in the initial
>>>    discussion phase and it could be some time before we can offer existing
>>>    Java implementation of Kafka for Python SDK users.
>>>    -
>>>
>>>    Cross-language IO might take even longer to reach the point where
>>>    it's fully equivalent in expressive power to a transform written in the
>>>    host language - e.g. supporting host-language lambdas as part of the
>>>    transform configuration is likely to take a lot longer than "first-order"
>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>    configuration, e.g. timestamp functions.
>>>
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Is this what we want to do in the long run, i.e. implement copies of
>>>> connectors for different SDKs? I thought the plan was to enable using
>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>> python. This way we wouldn't duplicate bugs for three different language
>>>> (Java, Python, and Go for now).
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>>>
>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>>>> this will benefits many Python SDK users and will serve as a good example
>>>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>>>> runners to use Python Splittable DoFn is in active development).  I created
>>>>> a document [1] that makes the case for adding this connector and compares
>>>>> the performance of available Python Kafka client libraries. Also I created
>>>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>>>> implement a Kafka source. I extremely appreciate any feedback related to
>>>>> this.
>>>>>
>>>>> [1]
>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>> [2]
>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>
>>>>

Re: Kafka connector for Beam Python SDK

Posted by Kenneth Knowles <kl...@google.com>.
I agree with Cham's motivations as far as "we need it now" and getting
Python SDF up and running and exercised on a real connector.

But I do find the current API of BigQueryIO to be a poor example. That
particular functionality on BigQueryIO seems extraneous and goes against
our own style guide [1]. The recommended way to write it would be for
BigQueryIO to output a natural concrete type (like TableRow) and allow the
following step to do conversions. This is a broader programming best
practice - unless there is a compelling reason, you should just return the
value rather than accept a higher-order function to operate on the value.
Is there a compelling reason in this case? I just dug through the code and
just see that it bottoms out in AvroSource where it does not seem to add
functionality.

Considering cross-language pipelines as a primary use case for all
connectors, perhaps we should audit them and bring them into alignment now,
deprecating paths using higher-order functions. We can still consider
host-language convenience composites.

For an unbounded source like KafkaIO the compelling reason is the timestamp
extracting function to be able to maintain a watermark. Notably, PubsubIO
does not accept such a function, but requires the timestamp to be in a
metadata field that any language can describe (versus having to parse the
message to pull out the timestamp).

Kenn

[1]
https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections

On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:

> Another point: cross-language IOs might add a performance penalty in many
> cases. For an example of this look at BigQueryIO. The user can register a
> SerializableFunction that is evaluated on every record, and determines
> which destination to write the record to. Now a Python user would want to
> register a Python function for this of course. this means that the Java IO
> would have to invoke Python code for each record it sees, which will likely
> be a big performance hit.
>
> Of course the downside of duplicating IOs is exactly as you say - multiple
> versions to maintain, and potentially duplicate bugs. I think the right
> answer will need to be on a case-by-case basis.
>
> Reuven
>
> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Hi Aljoscha,
>>
>> I tried to cover this in the doc. Once we have full support for
>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>> think we should cease defining new sources/sinks for Beam Python SDK till
>> we get to that point. I think there are good reasons for adding Kafka
>> support for Python today and many Beam users have request this. Also, note
>> that proposed Python Kafka source will be based on the Splittable DoFn
>> framework while the current Java version is based on the UnboundedSource
>> framework. Here are the reasons that are currently listed in the doc.
>>
>>
>>    -
>>
>>    Users might find it useful to have at least one unbounded source and
>>    sink combination implemented in Python SDK and Kafka is the streaming
>>    system that makes most sense to support if we just want to add support for
>>    only one such system in Python SDK.
>>    -
>>
>>    Not all runners might support cross-language IO. Also some
>>    user/runner/deployment combinations might require an unbounded source/sink
>>    implemented in Python SDK.
>>    -
>>
>>    We recently added Splittable DoFn support to Python SDK. It will be
>>    good to have at least one production quality Splittable DoFn that
>>    will server as a good example for any users who wish to implement new
>>    Splittable DoFn implementations on top of Beam Python SDK.
>>    -
>>
>>    Cross-language transform feature is currently is in the initial
>>    discussion phase and it could be some time before we can offer existing
>>    Java implementation of Kafka for Python SDK users.
>>    -
>>
>>    Cross-language IO might take even longer to reach the point where
>>    it's fully equivalent in expressive power to a transform written in the
>>    host language - e.g. supporting host-language lambdas as part of the
>>    transform configuration is likely to take a lot longer than "first-order"
>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>    configuration, e.g. timestamp functions.
>>
>>
>> Thanks,
>> Cham
>>
>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Is this what we want to do in the long run, i.e. implement copies of
>>> connectors for different SDKs? I thought the plan was to enable using
>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>> python. This way we wouldn't duplicate bugs for three different language
>>> (Java, Python, and Go for now).
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>>
>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>>> this will benefits many Python SDK users and will serve as a good example
>>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>>> runners to use Python Splittable DoFn is in active development).  I created
>>>> a document [1] that makes the case for adding this connector and compares
>>>> the performance of available Python Kafka client libraries. Also I created
>>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>>> implement a Kafka source. I extremely appreciate any feedback related to
>>>> this.
>>>>
>>>> [1]
>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>> [2]
>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>
>>>

Re: Kafka connector for Beam Python SDK

Posted by Reuven Lax <re...@google.com>.
Another point: cross-language IOs might add a performance penalty in many
cases. For an example of this look at BigQueryIO. The user can register a
SerializableFunction that is evaluated on every record, and determines
which destination to write the record to. Now a Python user would want to
register a Python function for this of course. this means that the Java IO
would have to invoke Python code for each record it sees, which will likely
be a big performance hit.

Of course the downside of duplicating IOs is exactly as you say - multiple
versions to maintain, and potentially duplicate bugs. I think the right
answer will need to be on a case-by-case basis.

Reuven

On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <ch...@google.com>
wrote:

> Hi Aljoscha,
>
> I tried to cover this in the doc. Once we have full support for
> cross-language IO, we can decide this on a case-by-case basis. But I don't
> think we should cease defining new sources/sinks for Beam Python SDK till
> we get to that point. I think there are good reasons for adding Kafka
> support for Python today and many Beam users have request this. Also, note
> that proposed Python Kafka source will be based on the Splittable DoFn
> framework while the current Java version is based on the UnboundedSource
> framework. Here are the reasons that are currently listed in the doc.
>
>
>    -
>
>    Users might find it useful to have at least one unbounded source and
>    sink combination implemented in Python SDK and Kafka is the streaming
>    system that makes most sense to support if we just want to add support for
>    only one such system in Python SDK.
>    -
>
>    Not all runners might support cross-language IO. Also some
>    user/runner/deployment combinations might require an unbounded source/sink
>    implemented in Python SDK.
>    -
>
>    We recently added Splittable DoFn support to Python SDK. It will be
>    good to have at least one production quality Splittable DoFn that will
>    server as a good example for any users who wish to implement new Splittable
>    DoFn implementations on top of Beam Python SDK.
>    -
>
>    Cross-language transform feature is currently is in the initial
>    discussion phase and it could be some time before we can offer existing
>    Java implementation of Kafka for Python SDK users.
>    -
>
>    Cross-language IO might take even longer to reach the point where it's
>    fully equivalent in expressive power to a transform written in the host
>    language - e.g. supporting host-language lambdas as part of the transform
>    configuration is likely to take a lot longer than "first-order"
>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>    configuration, e.g. timestamp functions.
>
>
> Thanks,
> Cham
>
> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Is this what we want to do in the long run, i.e. implement copies of
>> connectors for different SDKs? I thought the plan was to enable using
>> connectors written in different languages, i.e. use the Java Kafka I/O from
>> python. This way we wouldn't duplicate bugs for three different language
>> (Java, Python, and Go for now).
>>
>> Best,
>> Aljoscha
>>
>>
>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com> wrote:
>>
>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>
>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>> this will benefits many Python SDK users and will serve as a good example
>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>> runners to use Python Splittable DoFn is in active development).  I created
>>> a document [1] that makes the case for adding this connector and compares
>>> the performance of available Python Kafka client libraries. Also I created
>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>> implement a Kafka source. I extremely appreciate any feedback related to
>>> this.
>>>
>>> [1]
>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>> [2]
>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>
>>> Thanks,
>>> Cham
>>>
>>
>>

Re: Kafka connector for Beam Python SDK

Posted by Chamikara Jayalath <ch...@google.com>.
Hi Aljoscha,

I tried to cover this in the doc. Once we have full support for
cross-language IO, we can decide this on a case-by-case basis. But I don't
think we should cease defining new sources/sinks for Beam Python SDK till
we get to that point. I think there are good reasons for adding Kafka
support for Python today and many Beam users have request this. Also, note
that proposed Python Kafka source will be based on the Splittable DoFn
framework while the current Java version is based on the UnboundedSource
framework. Here are the reasons that are currently listed in the doc.


   -

   Users might find it useful to have at least one unbounded source and
   sink combination implemented in Python SDK and Kafka is the streaming
   system that makes most sense to support if we just want to add support for
   only one such system in Python SDK.
   -

   Not all runners might support cross-language IO. Also some
   user/runner/deployment combinations might require an unbounded source/sink
   implemented in Python SDK.
   -

   We recently added Splittable DoFn support to Python SDK. It will be good
   to have at least one production quality Splittable DoFn that will server
   as a good example for any users who wish to implement new Splittable DoFn
   implementations on top of Beam Python SDK.
   -

   Cross-language transform feature is currently is in the initial
   discussion phase and it could be some time before we can offer existing
   Java implementation of Kafka for Python SDK users.
   -

   Cross-language IO might take even longer to reach the point where it's
   fully equivalent in expressive power to a transform written in the host
   language - e.g. supporting host-language lambdas as part of the transform
   configuration is likely to take a lot longer than "first-order"
   cross-language IO. KafkaIO in Java uses lambdas as part of transform
   configuration, e.g. timestamp functions.


Thanks,
Cham

On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <al...@apache.org>
wrote:

> Is this what we want to do in the long run, i.e. implement copies of
> connectors for different SDKs? I thought the plan was to enable using
> connectors written in different languages, i.e. use the Java Kafka I/O from
> python. This way we wouldn't duplicate bugs for three different language
> (Java, Python, and Go for now).
>
> Best,
> Aljoscha
>
>
> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com> wrote:
>
> Thanks Cham, this is great! I left just a couple of comments on the doc.
>
> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Hi All,
>>
>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>> this will benefits many Python SDK users and will serve as a good example
>> for recently added Splittable DoFn API (Fn API support which will allow all
>> runners to use Python Splittable DoFn is in active development).  I created
>> a document [1] that makes the case for adding this connector and compares
>> the performance of available Python Kafka client libraries. Also I created
>> a POC [2] that illustrates the API and how Python SDF API can be used to
>> implement a Kafka source. I extremely appreciate any feedback related to
>> this.
>>
>> [1]
>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>> [2]
>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>
>> Thanks,
>> Cham
>>
>
>

Re: Kafka connector for Beam Python SDK

Posted by Aljoscha Krettek <al...@apache.org>.
Is this what we want to do in the long run, i.e. implement copies of connectors for different SDKs? I thought the plan was to enable using connectors written in different languages, i.e. use the Java Kafka I/O from python. This way we wouldn't duplicate bugs for three different language (Java, Python, and Go for now).

Best,
Aljoscha

> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com> wrote:
> 
> Thanks Cham, this is great! I left just a couple of comments on the doc.
> 
> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <chamikara@google.com <ma...@google.com>> wrote:
> Hi All,
> 
> I'm looking into adding a Kafka connector to Beam Python SDK. I think this will benefits many Python SDK users and will serve as a good example for recently added Splittable DoFn API (Fn API support which will allow all runners to use Python Splittable DoFn is in active development).  I created a document [1] that makes the case for adding this connector and compares the performance of available Python Kafka client libraries. Also I created a POC [2] that illustrates the API and how Python SDF API can be used to implement a Kafka source. I extremely appreciate any feedback related to this.
> 
> [1] https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing <https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing>
> [2] https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9 <https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9>
> 
> Thanks,
> Cham


Re: Kafka connector for Beam Python SDK

Posted by Eugene Kirpichov <ki...@google.com>.
Thanks Cham, this is great! I left just a couple of comments on the doc.

On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Hi All,
>
> I'm looking into adding a Kafka connector to Beam Python SDK. I think this
> will benefits many Python SDK users and will serve as a good example for
> recently added Splittable DoFn API (Fn API support which will allow all
> runners to use Python Splittable DoFn is in active development).  I created
> a document [1] that makes the case for adding this connector and compares
> the performance of available Python Kafka client libraries. Also I created
> a POC [2] that illustrates the API and how Python SDF API can be used to
> implement a Kafka source. I extremely appreciate any feedback related to
> this.
>
> [1]
> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
> [2]
> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>
> Thanks,
> Cham
>