You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Chamikara Jayalath <ch...@google.com> on 2018/05/01 07:01:45 UTC

Re: Kafka connector for Beam Python SDK

Thanks all for the comments. Based on the discussion so far, looks like we
have to flesh out the cross-language transforms feature quite a bit before
we can utilize some of the existing Java IO in other SDKs. This might
involve redesigning some of the existing Java IOs to allow expressing
second order APIs in other languages without significantly affecting the
execution performance. Also, several people agreed that adding a SDF based
Kafka source to Python SDK will allow us to better iron out the SDF API.
I'll start prototyping the Kafka connector. Please follow
https://issues.apache.org/jira/browse/BEAM-3788 for details.

Thanks,
Cham

On Mon, Apr 30, 2018 at 4:08 PM Kenneth Knowles <kl...@google.com> wrote:

> The numbers on that PR are not really what end-to-end means to me - it
> normally means you have a fully represented productionized use case and the
> metric you are looking at is the actual impact on the full system (like
> latency from a tap on mobile to a dashboard being updated, or monthly
> compute cost for a system). Also FWIW when I say "TableRow" I don't mean
> the JSON wire format for them. I also believe that Luke's proposal has
> never been measured.
>
> But it is an obvious & fair point that encoding the table repeatedly will
> bloat compared to a second order transform. However in an end to end test
> you'd want to compare against ways of shuffling compact proxies and joining
> back to the full value, depending on the portability overhead. But, I
> agree, as you say, there's probably a decent design space of blending a
> portable first-order transform with little second-order helpers. Or perhaps
> with a shared memory layout we can have reasonable performance with a
> cross-language higher-order situation.
>
> Kenn
>
>
>
> On Mon, Apr 30, 2018 at 10:54 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> I think we've discussed this before... It is true that all of our
>> second-order APIs can be re-expressed as first-order APIs, but that would
>> come at a very serious performance cost - e.g. significant increase in
>> amount of data shuffled / materialized. The second-order APIs (most
>> importantly, Dynamic Destinations in BigQueryIO and FileIO.write) were
>> deliberately designed this way to minimize amount of data shuffled (grouped
>> by destination) and postpone the redundant representation (record,
>> destination) as late as possible in the graph.
>>
>> Experience from https://github.com/apache/beam/pull/3894 shows that
>> getting rid of TableRow in the representation alone gives 2x-3x end-to-end
>> performance gains. Gains from avoiding redundancy are likely still higher.
>> Some back-of-the-envelope calculations:
>> - Imagine we're writing purchases to BigQuery tables (store_id,
>> customer_id, product_id, quantity), one table per store, e.g.
>> "stores_dataset:purchases_$storeid".
>> - With the second-order representation, we group records (store_id,
>> customer_id, product_id, quantity) by store_id - both key and value are a
>> handful of bytes.
>> - With the first-order representation, we group records of the form
>> {table_name:"stores_dataset:purchases_$storeid", row: {"store_id":"...",
>> "customer_id":"...", "product_id":"...", "quantity":"..."}} which is many
>> times more in encoded form, both key and value. Schemas, perhaps, might
>> help with encoding the TableRow less redundantly, but a) I don't know if
>> that's actually the case b) the schemas work will also take more time to
>> land in non-Java SDKs c) this doesn't get rid of the redundancy in keys (at
>> a minimum, in a cross-language world, we'll definitely need to *encode* all
>> such keys redundantly)
>>
>> I.e., yes, first-order APIs are possible, and likely even desirable for
>> the first version of cross-language IO, but I do not believe that staying
>> first-order-only is an acceptable long-term state.
>>
>> Note also that second-order doesn't necessarily require ability to invoke
>> cross-language lambdas - there are other approaches, e.g. rewriting in the
>> SDK language only the sub-parts of the transforms that actually use the
>> lambdas.
>>
>> On Mon, Apr 30, 2018 at 10:34 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I believe that most (all?) of these cases of executing a lambda could be
>>> avoided if we passed along structured records like:
>>> {
>>>   table_name:
>>>   row: { ... }
>>> }
>>>
>>>
>>> On Mon, Apr 30, 2018 at 10:24 AM Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <kl...@google.com> wrote:
>>>>
>>>>> I agree with Cham's motivations as far as "we need it now" and getting
>>>>> Python SDF up and running and exercised on a real connector.
>>>>>
>>>>> But I do find the current API of BigQueryIO to be a poor example. That
>>>>> particular functionality on BigQueryIO seems extraneous and goes against
>>>>> our own style guide [1]. The recommended way to write it would be for
>>>>> BigQueryIO to output a natural concrete type (like TableRow) and allow the
>>>>> following step to do conversions. This is a broader programming best
>>>>> practice - unless there is a compelling reason, you should just return the
>>>>> value rather than accept a higher-order function to operate on the value.
>>>>> Is there a compelling reason in this case? I just dug through the code and
>>>>> just see that it bottoms out in AvroSource where it does not seem to add
>>>>> functionality.
>>>>>
>>>>
>>>> I think what Reuven was referring to was a functionality of the BQ sink
>>>> not the source. Even if we set the destination in an additional step, this
>>>> will still have to cross SDK harness boundary just to set the property
>>>> (invoking BQ destination lambda in this case), won't it? Java will be able
>>>> to fuse steps in this case but Python/Go will involve "some work in Java
>>>> SDK harness" + "invoking Python/Go harness to invoke the BQ destination
>>>> lambda" + "invoke Java harness to continue processing", including
>>>> serialization/deserialization in between. This could still result in a
>>>> significant amount of overhead for SDKs other than the one where the IO is
>>>> implemented.
>>>>
>>>>
>>>>>
>>>>> Considering cross-language pipelines as a primary use case for all
>>>>> connectors, perhaps we should audit them and bring them into alignment now,
>>>>> deprecating paths using higher-order functions. We can still consider
>>>>> host-language convenience composites.
>>>>>
>>>>> For an unbounded source like KafkaIO the compelling reason is the
>>>>> timestamp extracting function to be able to maintain a watermark. Notably,
>>>>> PubsubIO does not accept such a function, but requires the timestamp to be
>>>>> in a metadata field that any language can describe (versus having to parse
>>>>> the message to pull out the timestamp).
>>>>>
>>>>> Kenn
>>>>>
>>>>> [1]
>>>>> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>>>>>
>>>>> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Another point: cross-language IOs might add a performance penalty in
>>>>>> many cases. For an example of this look at BigQueryIO. The user can
>>>>>> register a SerializableFunction that is evaluated on every record, and
>>>>>> determines which destination to write the record to. Now a Python user
>>>>>> would want to register a Python function for this of course. this means
>>>>>> that the Java IO would have to invoke Python code for each record it sees,
>>>>>> which will likely be a big performance hit.
>>>>>>
>>>>>> Of course the downside of duplicating IOs is exactly as you say -
>>>>>> multiple versions to maintain, and potentially duplicate bugs. I think the
>>>>>> right answer will need to be on a case-by-case basis.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> I tried to cover this in the doc. Once we have full support for
>>>>>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>>>>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>>>>>> we get to that point. I think there are good reasons for adding Kafka
>>>>>>> support for Python today and many Beam users have request this. Also, note
>>>>>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>>>>>> framework while the current Java version is based on the UnboundedSource
>>>>>>> framework. Here are the reasons that are currently listed in the doc.
>>>>>>>
>>>>>>>
>>>>>>>    -
>>>>>>>
>>>>>>>    Users might find it useful to have at least one unbounded source
>>>>>>>    and sink combination implemented in Python SDK and Kafka is the streaming
>>>>>>>    system that makes most sense to support if we just want to add support for
>>>>>>>    only one such system in Python SDK.
>>>>>>>    -
>>>>>>>
>>>>>>>    Not all runners might support cross-language IO. Also some
>>>>>>>    user/runner/deployment combinations might require an unbounded source/sink
>>>>>>>    implemented in Python SDK.
>>>>>>>    -
>>>>>>>
>>>>>>>    We recently added Splittable DoFn support to Python SDK. It will
>>>>>>>    be good to have at least one production quality Splittable DoFn
>>>>>>>    that will server as a good example for any users who wish to implement new
>>>>>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>>>>>    -
>>>>>>>
>>>>>>>    Cross-language transform feature is currently is in the initial
>>>>>>>    discussion phase and it could be some time before we can offer existing
>>>>>>>    Java implementation of Kafka for Python SDK users.
>>>>>>>    -
>>>>>>>
>>>>>>>    Cross-language IO might take even longer to reach the point
>>>>>>>    where it's fully equivalent in expressive power to a transform written in
>>>>>>>    the host language - e.g. supporting host-language lambdas as part of the
>>>>>>>    transform configuration is likely to take a lot longer than "first-order"
>>>>>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>>>>>    configuration, e.g. timestamp functions.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <
>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>
>>>>>>>> Is this what we want to do in the long run, i.e. implement copies
>>>>>>>> of connectors for different SDKs? I thought the plan was to enable using
>>>>>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>>>>>> python. This way we wouldn't duplicate bugs for three different language
>>>>>>>> (Java, Python, and Go for now).
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>>
>>>>>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <ki...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks Cham, this is great! I left just a couple of comments on the
>>>>>>>> doc.
>>>>>>>>
>>>>>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>>>>>> chamikara@google.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I
>>>>>>>>> think this will benefits many Python SDK users and will serve as a good
>>>>>>>>> example for recently added Splittable DoFn API (Fn API support which will
>>>>>>>>> allow all runners to use Python Splittable DoFn is in active development).
>>>>>>>>> I created a document [1] that makes the case for adding this connector and
>>>>>>>>> compares the performance of available Python Kafka client libraries. Also I
>>>>>>>>> created a POC [2] that illustrates the API and how Python SDF API can be
>>>>>>>>> used to implement a Kafka source. I extremely appreciate any feedback
>>>>>>>>> related to this.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>>>>>> [2]
>>>>>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>
>>>>>>>>