You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Thomas Weise <th...@apache.org> on 2018/04/24 16:03:27 UTC

Custom URNs and runner translation

I'm working on a mini POC to enable Kafka as custom streaming source for a
Python pipeline executing on the (in-progress) portable Flink runner.

We eventually want to use the same native Flink connectors for sources and
sinks that we also use in other Flink jobs.

I got a simple example to work with the FlinkKafkaConsumer010 reading from
Kafka and a Python lambda logging the value. The code is here:

https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9

I'm looking for feedback/opinions on the following items in particular:

* Enabling custom translation on the Flink portable runner (custom
translator could be loaded with ServiceLoader, additional translations
could also be specified as job server configuration, pipeline option, ...)

* For the Python side, is what's shown in the commit the recommended way to
define a custom transform (it would eventually live in a reusable custom
module that pipeline authors can import)? Also, the example does not have
the configuration part covered yet..

* Cross-language coders: In this example the Kafka source only considers
the message value and uses the byte coder that both sides understand. If I
wanted to pass on the key and possibly other metadata to the Python
transform (similar to KafkaRecord from Java KafkaIO), then a specific coder
is needed. Such coder could be written using protobuf, Avro etc, but it
would also need to be registered.

Thanks,
Thomas

Re: Custom URNs and runner translation

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Apr 27, 2018 at 12:18 PM Thomas Weise <th...@apache.org> wrote:

> Thanks for all the feedback! I agree that the desirable state is to have
solid connector implementations for all common integration scenarios as
part of Beam. And it seems that the path there would be cross-language IO.

> The ability to specify with URN and implement custom transforms is also
important. Such transforms may not qualify for inclusion in Beam for a
variety of reasons (only relevant for a specific environment or use case,
dependencies/licensing, ...).

> For my specific experiment, I prefer the custom URN over trying to bend
the implementation to mimic an SDF based KafkaIO that it wouldn't (and
doesn't need to) be semantically equivalent to. At this point Beam doesn't
have the spec and implementation for said KafkaIO, but it would be great to
see an example how it would look like. Following a Beam spec would
absolutely make sense if the custom implementation is purely for
optimization or similar purpose.

> I wanted to circle back to the coder related question. I see that we now
have a proto definition for the standard transforms and coders, which is
really nice:


https://github.com/apache/beam/blob/42fac771814b119c162d40e9300f5a0d3afe0f48/model/pipeline/src/main/proto/beam_runner_api.proto#L521

> This enables interoperability between languages with some standard types
(KV, ITERABLE etc.), but for a structure like KafkaRecord a custom coder
would be required, implemented in both Java and Python. Any thoughts on
providing a generic tuple/record coder as part of the spec?

Yes, we definitely should, and this would be right in line with the work
that's being done on defining schemas.

> On Fri, Apr 27, 2018 at 8:53 AM, Lukasz Cwik <lc...@google.com> wrote:



>> On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <ch...@google.com>
wrote:



>>> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <ki...@google.com>
wrote:

>>>> I agree with Thomas' sentiment that cross-language IO is very
important because of how much work it takes to produce a mature connector
implementation in a language. Looking at implementations of BigQueryIO,
PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
to reimplement them entirely in Python and Go.

>>>> I'm imagining pretty much what Kenn is describing: a pipeline would
specify some transforms by URN + payload, and rely on the runner to do
whatever it takes to run this - either by expanding it into a Beam
implementation of this transform that the runner chooses to use (could be
in the same language or in a different language; either way, the runner
would indeed need to invoke the respective SDK to expand it given the
parameters), or by doing something entirely runner-specific (e.g. using the
built-in Flink Kafka connector).

>>>> I don't see a reason to require that there *must* exist a Beam
implementation of this transform. There only, ideally, must be a runner-
and language-agnostic spec for the URN and payload; of course, then the
transform is only as portable as the set of runners that implement this URN.


>>> For a transform in general it's true that we don't need a Beam
implementation, but more specifically for IOs I think there are many
benefits to having the implementation in Beam. For example,

>>> IO connector will offer same behavior and feature set across various
runners/SDKs.
>>> Beam community will be able to view/modify/improve the IO connector.
>>> existing IO connectors will serve as examples for users who wish to
develop new IO connectors


>> More runners will be able to execute the users pipeline.


>>>> I actually really like the idea that the transform can be implemented
in a completely runner-specific way without a Beam expansion to back it up
- it would let us unblock a lot of the work earlier than full-blown
cross-language IO is delivered or even than SDFs work in all
languages/runners.


>>> If there are existing established connectors (for example, Kafka for
Flink in this case) I agree. But for anybody developing a new IO connector,
I think we should encourage developing that in Beam (in some SDK) given
that the connector will be available to all runners (and to all SDKs once
we have cross-language transforms).

>>> Thanks,
>>> Cham



>>>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <kl...@google.com>
wrote:

>>>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka
connector, right? I was imagining: Python SDK submits pipeline with a
KafkaIO (with URN + payload) maybe bogus contents. It is replaced with a
small Flink subgraph, including the native Flink Kafka connector and some
compensating transfoms to match the required semantics. To me, this is
preferable to making single-runner transform URNs, since that breaks runner
portability by definition.

>>>>> Kenn

>>>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <
chamikara@google.com> wrote:



>>>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:

>>>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com>
wrote:

>>>>>>>> The premise of URN + payload is that you can establish a spec. A
native override still needs to meet the spec - it may still require some
compensating code. Worrying about weird differences between runners seems
more about worrying that an adequate spec cannot be determined.


>>>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
KafkaIO run after that on this dynamic (not known until runtime) list of
topics. If the native Flink source doesn't work this way, then it doesn't
share the same spec and should have a different URN.


>>>>>> Agree that if they cannot share the same spec, SDF and native
transforms warrant different URNs. Native Kafka might be able to support a
PCollection of topics/partitions as an input though by utilizing underlying
native Flink Kafka connector as a library. On the other hand, we might
decide to expand SDF based ParDos into to other transforms before a runner
gets a chance to override in which case this kind of replacements will not
be possible.

>>>>>> Thanks,
>>>>>> Cham



>>>>>>>> Runners will already invoke the SDF differently, so users treating
every detail of some implementation as the spec are doomed.

>>>>>>>> Kenn

>>>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:



>>>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
chamikara@google.com> wrote:



>>>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde
>>>>>>>>>> <he...@google.com>
wrote:

>>>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could
be a DoFn that
>>>>>>>>>>> > fails loudly if it's actually called in the short term rather
than a full
>>>>>>>>>>> > Python implementation.

>>>>>>>>>>> For configurable runner-native IO, for now, I think it is
reasonable to use a URN + special data payload directly without a KafkaDoFn
-- assuming it's a portable pipeline. That's what we do in Go for
PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
well. I agree that non-native alternative implementation is desirable, but
if one is not present we should IMO rather fail at job submission instead
of at runtime. I could imagine connectors intrinsic to an execution engine
where non-native implementations are not possible.


>>>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
similar to any other SDF by default (initial splitting, GBK, and a map-task
equivalent, for example) but a runner (Flink in this case) will be free to
override it with an runner-native implementation if desired. I assume
runner will have a chance to perform this override before the SDF expansion
(which is not fully designed yet). Providing a separate source/sink
transforms for Flink native Kafka will be an option as well, but that will
be less desirable from a Python user API perspective.


>>>>>>>>> Are we sure that the internal SDF will provide the same
functionality as the native one? What if the Kafka SDF is in the middle of
a pipeline - can Flink support that? Having a separate transform for the
Flink native source might be a better user experience than having one that
changes its behavior in strange ways depending on the runner.







>>>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
robertwb@google.com> wrote:

>>>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org>
wrote:

>>>>>>>>>>>> > Hi Cham,

>>>>>>>>>>>> > Thanks for the feedback!

>>>>>>>>>>>> > I should have probably clarified that my POC and questions
aren't
>>>>>>>>>>>> specific to Kafka as source, but pretty much any other
source/sink that we
>>>>>>>>>>>> internally use as well. We have existing Flink pipelines that
are written
>>>>>>>>>>>> in Java and we want to use the same connectors with the Python
SDK on top
>>>>>>>>>>>> of the already operationalized Flink stack. Therefore,
portability isn't a
>>>>>>>>>>>> concern as much as the ability to integrate is.


>>>>>>>>>> Thanks for the clarification. Agree that providing runner-native
implementations of established source/sinks can be can be desirable in some
cases.


>>>>>>>>>>>> > -->

>>>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>>>>> > <ch...@google.com>
>>>>>>>>>>>> wrote:

>>>>>>>>>>>> >> Hi Thomas,

>>>>>>>>>>>> >> Seems like we are working on similar (partially) things :).

>>>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise
>>>>>>>>>>>> <th...@apache.org>
wrote:

>>>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom
streaming source
>>>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
Flink runner.

>>>>>>>>>>>> >>> We eventually want to use the same native Flink connectors
for sources
>>>>>>>>>>>> and sinks that we also use in other Flink jobs.


>>>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ?
Do you mean
>>>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use
the same
>>>>>>>>>>>> Kafka connector implementation ?


>>>>>>>>>>>> > The native Flink sources as shown in the example below, not
the Beam
>>>>>>>>>>>> KafkaIO or other Beam sources.



>>>>>>>>>>>> >>> I got a simple example to work with the
FlinkKafkaConsumer010 reading
>>>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is
here:



https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9



>>>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items
in particular:

>>>>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
(custom
>>>>>>>>>>>> translator could be loaded with ServiceLoader, additional
translations
>>>>>>>>>>>> could also be specified as job server configuration, pipeline
option, ...)

>>>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
recommended
>>>>>>>>>>>> way to define a custom transform (it would eventually live in
a reusable
>>>>>>>>>>>> custom module that pipeline authors can import)? Also, the
example does not
>>>>>>>>>>>> have the configuration part covered yet..


>>>>>>>>>>>> >> The only standard unbounded source API offered by Python
SDK is the
>>>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm
trying to add a
>>>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
comparing
>>>>>>>>>>>> different Kafka Python client libraries. Will share more
information on
>>>>>>>>>>>> this soon.

>>>>>>>>>>>> >> I understand this might not be possible in all cases and we
might want
>>>>>>>>>>>> to consider adding a native source/sink implementations. But
this will
>>>>>>>>>>>> result in the implementation being runner-specific (each
runner will have
>>>>>>>>>>>> to have it's own source/sink implementation). So I think we
should try to
>>>>>>>>>>>> add connector implementations to Beam using the standard API
whenever
>>>>>>>>>>>> possible. We also have plans to implement support for cross
SDK transforms
>>>>>>>>>>>> in the future (so that we can utilize Java implementation from
Python for
>>>>>>>>>>>> example) but we are not there yet and we might still want to
implement a
>>>>>>>>>>>> connector for a given SDK if there's good client library
support.


>>>>>>>>>>>> > It is great that the Python SDK will have connectors that
are written in
>>>>>>>>>>>> Python in the future, but I think it is equally if not more
important to be
>>>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK
(and any
>>>>>>>>>>>> other non-Java SDK). Especially in a fully managed environment
it should be
>>>>>>>>>>>> possible to offer this to users in a way that is largely
transparent. It
>>>>>>>>>>>> takes significant time and effort to mature connectors and I'm
not sure it
>>>>>>>>>>>> is realistic to repeat that for all external systems in
multiple languages.
>>>>>>>>>>>> Or, to put it in another way, it is likely that instead of one
over time
>>>>>>>>>>>> rock solid connector per external system there will be
multiple less mature
>>>>>>>>>>>> implementations. That's also the reason we internally want to
use the Flink
>>>>>>>>>>>> native connectors - we know what they can and cannot do and
want to
>>>>>>>>>>>> leverage the existing investment.

>>>>>>>>>>>> There are two related issues here: how to specify transforms
(such as
>>>>>>>>>>>> sources) in a language-independent manner, and how specific
runners can
>>>>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
URNs: the
>>>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse
+
>>>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload
that fully
>>>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN
and
>>>>>>>>>>>> payload.) A runner that understands these URNs is free to make
any
>>>>>>>>>>>> (semantically-equivalent) substitutions it wants for this
transform.

>>>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be
a DoFn that
>>>>>>>>>>>> fails loudly if it's actually called in the short term rather
than a full
>>>>>>>>>>>> Python implementation. Eventually, we would like to be able to
call out to
>>>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated
ones like
>>>>>>>>>>>> BigQueryIO).

>>>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source
only
>>>>>>>>>>>> considers the message value and uses the byte coder that both
sides
>>>>>>>>>>>> understand. If I wanted to pass on the key and possibly other
metadata to
>>>>>>>>>>>> the Python transform (similar to KafkaRecord from Java
KafkaIO), then a
>>>>>>>>>>>> specific coder is needed. Such coder could be written using
protobuf, Avro
>>>>>>>>>>>> etc, but it would also need to be registered.


>>>>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
Python SDK.

>>>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
rather any
>>>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
agnostic
>>>>>>>>>>>> encoding for KV<?,?>, for example?

>>>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
payload), and
>>>>>>>>>>>> there are a couple of standard ones, including KV. See

https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>>>>> This is not a closed set.

>>>>>>>>>>>> - Robert

Re: Custom URNs and runner translation

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Apr 27, 2018 at 12:34 PM Kenneth Knowles <kl...@google.com> wrote:

> On Fri, Apr 27, 2018 at 12:18 PM Thomas Weise <th...@apache.org> wrote:


>> The ability to specify with URN and implement custom transforms is also
important. Such transforms may not qualify for inclusion in Beam for a
variety of reasons (only relevant for a specific environment or use case,
dependencies/licensing, ...).


> They don't need to be included in Beam - by design, a third party library
transform can specify its own URN and Payload to be put in the proto
representation. I'm not sure of the state of the code here, but I think the
current path is a shared dep on runners-core-construction and some
ServiceLoader shenanigans. Shading may be in place that breaks this.

Agreed, however I was saying that this particular record-like coder
probably would make sense as a Beam standard coder rather than having every
third-party define their own (or find some other shared location).

>> For my specific experiment, I prefer the custom URN over trying to bend
the implementation to mimic an SDF based KafkaIO that it wouldn't (and
doesn't need to) be semantically equivalent to. At this point Beam doesn't
have the spec and implementation for said KafkaIO, but it would be great to
see an example how it would look like. Following a Beam spec would
absolutely make sense if the custom implementation is purely for
optimization or similar purpose.

>> I wanted to circle back to the coder related question. I see that we now
have a proto definition for the standard transforms and coders, which is
really nice:


https://github.com/apache/beam/blob/42fac771814b119c162d40e9300f5a0d3afe0f48/model/pipeline/src/main/proto/beam_runner_api.proto#L521

>> This enables interoperability between languages with some standard types
(KV, ITERABLE etc.), but for a structure like KafkaRecord a custom coder
would be required, implemented in both Java and Python. Any thoughts on
providing a generic tuple/record coder as part of the spec?

>> Thanks,
>> Thomas



>> On Fri, Apr 27, 2018 at 8:53 AM, Lukasz Cwik <lc...@google.com> wrote:



>>> On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath
>>> <ch...@google.com>
wrote:



>>>> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <ki...@google.com>
wrote:

>>>>> I agree with Thomas' sentiment that cross-language IO is very
important because of how much work it takes to produce a mature connector
implementation in a language. Looking at implementations of BigQueryIO,
PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
to reimplement them entirely in Python and Go.

>>>>> I'm imagining pretty much what Kenn is describing: a pipeline would
specify some transforms by URN + payload, and rely on the runner to do
whatever it takes to run this - either by expanding it into a Beam
implementation of this transform that the runner chooses to use (could be
in the same language or in a different language; either way, the runner
would indeed need to invoke the respective SDK to expand it given the
parameters), or by doing something entirely runner-specific (e.g. using the
built-in Flink Kafka connector).

>>>>> I don't see a reason to require that there *must* exist a Beam
implementation of this transform. There only, ideally, must be a runner-
and language-agnostic spec for the URN and payload; of course, then the
transform is only as portable as the set of runners that implement this URN.


>>>> For a transform in general it's true that we don't need a Beam
implementation, but more specifically for IOs I think there are many
benefits to having the implementation in Beam. For example,

>>>> IO connector will offer same behavior and feature set across various
runners/SDKs.
>>>> Beam community will be able to view/modify/improve the IO connector.
>>>> existing IO connectors will serve as examples for users who wish to
develop new IO connectors


>>> More runners will be able to execute the users pipeline.


>>>>> I actually really like the idea that the transform can be implemented
in a completely runner-specific way without a Beam expansion to back it up
- it would let us unblock a lot of the work earlier than full-blown
cross-language IO is delivered or even than SDFs work in all
languages/runners.


>>>> If there are existing established connectors (for example, Kafka for
Flink in this case) I agree. But for anybody developing a new IO connector,
I think we should encourage developing that in Beam (in some SDK) given
that the connector will be available to all runners (and to all SDKs once
we have cross-language transforms).

>>>> Thanks,
>>>> Cham



>>>>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <kl...@google.com>
wrote:

>>>>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka
connector, right? I was imagining: Python SDK submits pipeline with a
KafkaIO (with URN + payload) maybe bogus contents. It is replaced with a
small Flink subgraph, including the native Flink Kafka connector and some
compensating transfoms to match the required semantics. To me, this is
preferable to making single-runner transform URNs, since that breaks runner
portability by definition.

>>>>>> Kenn

>>>>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <
chamikara@google.com> wrote:



>>>>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:

>>>>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com>
wrote:

>>>>>>>>> The premise of URN + payload is that you can establish a spec. A
native override still needs to meet the spec - it may still require some
compensating code. Worrying about weird differences between runners seems
more about worrying that an adequate spec cannot be determined.


>>>>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
KafkaIO run after that on this dynamic (not known until runtime) list of
topics. If the native Flink source doesn't work this way, then it doesn't
share the same spec and should have a different URN.


>>>>>>> Agree that if they cannot share the same spec, SDF and native
transforms warrant different URNs. Native Kafka might be able to support a
PCollection of topics/partitions as an input though by utilizing underlying
native Flink Kafka connector as a library. On the other hand, we might
decide to expand SDF based ParDos into to other transforms before a runner
gets a chance to override in which case this kind of replacements will not
be possible.

>>>>>>> Thanks,
>>>>>>> Cham



>>>>>>>>> Runners will already invoke the SDF differently, so users
treating every detail of some implementation as the spec are doomed.

>>>>>>>>> Kenn

>>>>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:



>>>>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
chamikara@google.com> wrote:



>>>>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <
herohde@google.com> wrote:

>>>>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could
be a DoFn that
>>>>>>>>>>>> > fails loudly if it's actually called in the short term
rather than a full
>>>>>>>>>>>> > Python implementation.

>>>>>>>>>>>> For configurable runner-native IO, for now, I think it is
reasonable to use a URN + special data payload directly without a KafkaDoFn
-- assuming it's a portable pipeline. That's what we do in Go for
PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
well. I agree that non-native alternative implementation is desirable, but
if one is not present we should IMO rather fail at job submission instead
of at runtime. I could imagine connectors intrinsic to an execution engine
where non-native implementations are not possible.


>>>>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
similar to any other SDF by default (initial splitting, GBK, and a map-task
equivalent, for example) but a runner (Flink in this case) will be free to
override it with an runner-native implementation if desired. I assume
runner will have a chance to perform this override before the SDF expansion
(which is not fully designed yet). Providing a separate source/sink
transforms for Flink native Kafka will be an option as well, but that will
be less desirable from a Python user API perspective.


>>>>>>>>>> Are we sure that the internal SDF will provide the same
functionality as the native one? What if the Kafka SDF is in the middle of
a pipeline - can Flink support that? Having a separate transform for the
Flink native source might be a better user experience than having one that
changes its behavior in strange ways depending on the runner.







>>>>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
robertwb@google.com> wrote:

>>>>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org>
wrote:

>>>>>>>>>>>>> > Hi Cham,

>>>>>>>>>>>>> > Thanks for the feedback!

>>>>>>>>>>>>> > I should have probably clarified that my POC and questions
aren't
>>>>>>>>>>>>> specific to Kafka as source, but pretty much any other
source/sink that we
>>>>>>>>>>>>> internally use as well. We have existing Flink pipelines that
are written
>>>>>>>>>>>>> in Java and we want to use the same connectors with the
Python SDK on top
>>>>>>>>>>>>> of the already operationalized Flink stack. Therefore,
portability isn't a
>>>>>>>>>>>>> concern as much as the ability to integrate is.


>>>>>>>>>>> Thanks for the clarification. Agree that providing
runner-native implementations of established source/sinks can be can be
desirable in some cases.


>>>>>>>>>>>>> > -->

>>>>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>>>>>> > <ch...@google.com>
>>>>>>>>>>>>> wrote:

>>>>>>>>>>>>> >> Hi Thomas,

>>>>>>>>>>>>> >> Seems like we are working on similar (partially) things :).

>>>>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <
thw@apache.org> wrote:

>>>>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom
streaming source
>>>>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
Flink runner.

>>>>>>>>>>>>> >>> We eventually want to use the same native Flink
connectors for sources
>>>>>>>>>>>>> and sinks that we also use in other Flink jobs.


>>>>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ?
Do you mean
>>>>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use
the same
>>>>>>>>>>>>> Kafka connector implementation ?


>>>>>>>>>>>>> > The native Flink sources as shown in the example below, not
the Beam
>>>>>>>>>>>>> KafkaIO or other Beam sources.



>>>>>>>>>>>>> >>> I got a simple example to work with the
FlinkKafkaConsumer010 reading
>>>>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is
here:



https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9



>>>>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items
in particular:

>>>>>>>>>>>>> >>> * Enabling custom translation on the Flink portable
runner (custom
>>>>>>>>>>>>> translator could be loaded with ServiceLoader, additional
translations
>>>>>>>>>>>>> could also be specified as job server configuration, pipeline
option, ...)

>>>>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
recommended
>>>>>>>>>>>>> way to define a custom transform (it would eventually live in
a reusable
>>>>>>>>>>>>> custom module that pipeline authors can import)? Also, the
example does not
>>>>>>>>>>>>> have the configuration part covered yet..


>>>>>>>>>>>>> >> The only standard unbounded source API offered by Python
SDK is the
>>>>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm
trying to add a
>>>>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm
currently comparing
>>>>>>>>>>>>> different Kafka Python client libraries. Will share more
information on
>>>>>>>>>>>>> this soon.

>>>>>>>>>>>>> >> I understand this might not be possible in all cases and
we might want
>>>>>>>>>>>>> to consider adding a native source/sink implementations. But
this will
>>>>>>>>>>>>> result in the implementation being runner-specific (each
runner will have
>>>>>>>>>>>>> to have it's own source/sink implementation). So I think we
should try to
>>>>>>>>>>>>> add connector implementations to Beam using the standard API
whenever
>>>>>>>>>>>>> possible. We also have plans to implement support for cross
SDK transforms
>>>>>>>>>>>>> in the future (so that we can utilize Java implementation
from Python for
>>>>>>>>>>>>> example) but we are not there yet and we might still want to
implement a
>>>>>>>>>>>>> connector for a given SDK if there's good client library
support.


>>>>>>>>>>>>> > It is great that the Python SDK will have connectors that
are written in
>>>>>>>>>>>>> Python in the future, but I think it is equally if not more
important to be
>>>>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK
(and any
>>>>>>>>>>>>> other non-Java SDK). Especially in a fully managed
environment it should be
>>>>>>>>>>>>> possible to offer this to users in a way that is largely
transparent. It
>>>>>>>>>>>>> takes significant time and effort to mature connectors and
I'm not sure it
>>>>>>>>>>>>> is realistic to repeat that for all external systems in
multiple languages.
>>>>>>>>>>>>> Or, to put it in another way, it is likely that instead of
one over time
>>>>>>>>>>>>> rock solid connector per external system there will be
multiple less mature
>>>>>>>>>>>>> implementations. That's also the reason we internally want to
use the Flink
>>>>>>>>>>>>> native connectors - we know what they can and cannot do and
want to
>>>>>>>>>>>>> leverage the existing investment.

>>>>>>>>>>>>> There are two related issues here: how to specify transforms
(such as
>>>>>>>>>>>>> sources) in a language-independent manner, and how specific
runners can
>>>>>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
URNs: the
>>>>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a
Impulse +
>>>>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload
that fully
>>>>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a
URN and
>>>>>>>>>>>>> payload.) A runner that understands these URNs is free to
make any
>>>>>>>>>>>>> (semantically-equivalent) substitutions it wants for this
transform.

>>>>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could
be a DoFn that
>>>>>>>>>>>>> fails loudly if it's actually called in the short term rather
than a full
>>>>>>>>>>>>> Python implementation. Eventually, we would like to be able
to call out to
>>>>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated
ones like
>>>>>>>>>>>>> BigQueryIO).

>>>>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source
only
>>>>>>>>>>>>> considers the message value and uses the byte coder that both
sides
>>>>>>>>>>>>> understand. If I wanted to pass on the key and possibly other
metadata to
>>>>>>>>>>>>> the Python transform (similar to KafkaRecord from Java
KafkaIO), then a
>>>>>>>>>>>>> specific coder is needed. Such coder could be written using
protobuf, Avro
>>>>>>>>>>>>> etc, but it would also need to be registered.


>>>>>>>>>>>>> >> I think this requirement goes away if we implement Kafka
in Python SDK.

>>>>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
rather any
>>>>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a
language agnostic
>>>>>>>>>>>>> encoding for KV<?,?>, for example?

>>>>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
payload), and
>>>>>>>>>>>>> there are a couple of standard ones, including KV. See

https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>>>>>> This is not a closed set.

>>>>>>>>>>>>> - Robert

Re: Custom URNs and runner translation

Posted by Kenneth Knowles <kl...@google.com>.
On Fri, Apr 27, 2018 at 12:18 PM Thomas Weise <th...@apache.org> wrote:

>
> The ability to specify with URN and implement custom transforms is also
> important. Such transforms may not qualify for inclusion in Beam for a
> variety of reasons (only relevant for a specific environment or use case,
> dependencies/licensing, ...).
>

They don't need to be included in Beam - by design, a third party library
transform can specify its own URN and Payload to be put in the proto
representation. I'm not sure of the state of the code here, but I think the
current path is a shared dep on runners-core-construction and some
ServiceLoader shenanigans. Shading may be in place that breaks this.

Kenn




> For my specific experiment, I prefer the custom URN over trying to bend
> the implementation to mimic an SDF based KafkaIO that it wouldn't (and
> doesn't need to) be semantically equivalent to. At this point Beam
> doesn't have the spec and implementation for said KafkaIO, but it would be
> great to see an example how it would look like. Following a Beam spec
> would absolutely make sense if the custom implementation is purely for
> optimization or similar purpose.
>
> I wanted to circle back to the coder related question. I see that we now
> have a proto definition for the standard transforms and coders, which is
> really nice:
>
>
> https://github.com/apache/beam/blob/42fac771814b119c162d40e9300f5a0d3afe0f48/model/pipeline/src/main/proto/beam_runner_api.proto#L521
>
> This enables interoperability between languages with some standard types
> (KV, ITERABLE etc.), but for a structure like KafkaRecord a custom coder
> would be required, implemented in both Java and Python. Any thoughts on
> providing a generic tuple/record coder as part of the spec?
>
> Thanks,
> Thomas
>
>
>
> On Fri, Apr 27, 2018 at 8:53 AM, Lukasz Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> I agree with Thomas' sentiment that cross-language IO is very important
>>>> because of how much work it takes to produce a mature connector
>>>> implementation in a language. Looking at implementations of BigQueryIO,
>>>> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
>>>> to reimplement them entirely in Python and Go.
>>>>
>>>> I'm imagining pretty much what Kenn is describing: a pipeline would
>>>> specify some transforms by URN + payload, and rely on the runner to do
>>>> whatever it takes to run this - either by expanding it into a Beam
>>>> implementation of this transform that the runner chooses to use (could be
>>>> in the same language or in a different language; either way, the runner
>>>> would indeed need to invoke the respective SDK to expand it given the
>>>> parameters), or by doing something entirely runner-specific (e.g. using the
>>>> built-in Flink Kafka connector).
>>>>
>>>> I don't see a reason to require that there *must* exist a Beam
>>>> implementation of this transform. There only, ideally, must be a runner-
>>>> and language-agnostic spec for the URN and payload; of course, then the
>>>> transform is only as portable as the set of runners that implement this URN.
>>>>
>>>
>>> For a transform in general it's true that we don't need a Beam
>>> implementation, but more specifically for IOs I think there are many
>>> benefits to having the implementation in Beam. For example,
>>>
>>>    - IO connector will offer same behavior and feature set across
>>>    various runners/SDKs.
>>>    - Beam community will be able to view/modify/improve the IO
>>>    connector.
>>>    - existing IO connectors will serve as examples for users who wish
>>>    to develop new IO connectors
>>>
>>>
>>>
>>    - More runners will be able to execute the users pipeline.
>>
>>
>>>> I actually really like the idea that the transform can be implemented
>>>> in a completely runner-specific way without a Beam expansion to back it up
>>>> - it would let us unblock a lot of the work earlier than full-blown
>>>> cross-language IO is delivered or even than SDFs work in all
>>>> languages/runners.
>>>>
>>>
>>> If there are existing established connectors (for example, Kafka for
>>> Flink in this case) I agree. But for anybody developing a new IO connector,
>>> I think we should encourage developing that in Beam (in some SDK) given
>>> that the connector will be available to all runners (and to all SDKs once
>>> we have cross-language transforms).
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <kl...@google.com>
>>>> wrote:
>>>>
>>>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka
>>>>> connector, right? I was imagining: Python SDK submits pipeline with a
>>>>> KafkaIO (with URN + payload) maybe bogus contents. It is replaced with a
>>>>> small Flink subgraph, including the native Flink Kafka connector and some
>>>>> compensating transfoms to match the required semantics. To me, this is
>>>>> preferable to making single-runner transform URNs, since that breaks runner
>>>>> portability by definition.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The premise of URN + payload is that you can establish a spec. A
>>>>>>>> native override still needs to meet the spec - it may still require some
>>>>>>>> compensating code. Worrying about weird differences between runners seems
>>>>>>>> more about worrying that an adequate spec cannot be determined.
>>>>>>>>
>>>>>>>
>>>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>>>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>>>>>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>>>>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>>>>>> share the same spec and should have a different URN.
>>>>>>>
>>>>>>
>>>>>> Agree that if they cannot share the same spec, SDF and native
>>>>>> transforms warrant different URNs. Native Kafka might be able to support a
>>>>>> PCollection of topics/partitions as an input though by utilizing underlying
>>>>>> native Flink Kafka connector as a library. On the other hand, we might
>>>>>> decide to expand SDF based ParDos into to other transforms before a runner
>>>>>> gets a chance to override in which case this kind of replacements will not
>>>>>> be possible.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>> Runners will already invoke the SDF differently, so users treating
>>>>>>>> every detail of some implementation as the spec are doomed.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>>>>>> chamikara@google.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could
>>>>>>>>>>> be a DoFn that
>>>>>>>>>>> > fails loudly if it's actually called in the short term rather
>>>>>>>>>>> than a full
>>>>>>>>>>> > Python implementation.
>>>>>>>>>>>
>>>>>>>>>>> For configurable runner-native IO, for now, I think it is
>>>>>>>>>>> reasonable to use a URN + special data payload directly without a KafkaDoFn
>>>>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for
>>>>>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>>>>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>>>>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>>>>>>>> where non-native implementations are not possible.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a map-task
>>>>>>>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>>>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>>>>>> runner will have a chance to perform this override before the SDF expansion
>>>>>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>>>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>>>>>>>> be less desirable from a Python user API perspective.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Are we sure that the internal SDF will provide the same
>>>>>>>>> functionality as the native one? What if the Kafka SDF is in the middle of
>>>>>>>>> a pipeline - can Flink support that? Having a separate transform for the
>>>>>>>>> Flink native source might be a better user experience than having one that
>>>>>>>>> changes its behavior in strange ways depending on the runner.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
>>>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> > Hi Cham,
>>>>>>>>>>>>
>>>>>>>>>>>> > Thanks for the feedback!
>>>>>>>>>>>>
>>>>>>>>>>>> > I should have probably clarified that my POC and questions
>>>>>>>>>>>> aren't
>>>>>>>>>>>> specific to Kafka as source, but pretty much any other
>>>>>>>>>>>> source/sink that we
>>>>>>>>>>>> internally use as well. We have existing Flink pipelines that
>>>>>>>>>>>> are written
>>>>>>>>>>>> in Java and we want to use the same connectors with the Python
>>>>>>>>>>>> SDK on top
>>>>>>>>>>>> of the already operationalized Flink stack. Therefore,
>>>>>>>>>>>> portability isn't a
>>>>>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>>>>>> implementations of established source/sinks can be can be desirable in some
>>>>>>>>>> cases.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>> > -->
>>>>>>>>>>>>
>>>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>>>>> > <ch...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> >> Hi Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>>>>>
>>>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom
>>>>>>>>>>>> streaming source
>>>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
>>>>>>>>>>>> Flink runner.
>>>>>>>>>>>>
>>>>>>>>>>>> >>> We eventually want to use the same native Flink connectors
>>>>>>>>>>>> for sources
>>>>>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do
>>>>>>>>>>>> you mean
>>>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use
>>>>>>>>>>>> the same
>>>>>>>>>>>> Kafka connector implementation ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> > The native Flink sources as shown in the example below, not
>>>>>>>>>>>> the Beam
>>>>>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >>> I got a simple example to work with the
>>>>>>>>>>>> FlinkKafkaConsumer010 reading
>>>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is
>>>>>>>>>>>> here:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>>>>>> particular:
>>>>>>>>>>>>
>>>>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>>>>>> (custom
>>>>>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>>>>>> translations
>>>>>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>>>>>> option, ...)
>>>>>>>>>>>>
>>>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>>>>>> recommended
>>>>>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>>>>>> reusable
>>>>>>>>>>>> custom module that pipeline authors can import)? Also, the
>>>>>>>>>>>> example does not
>>>>>>>>>>>> have the configuration part covered yet..
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >> The only standard unbounded source API offered by Python SDK
>>>>>>>>>>>> is the
>>>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm
>>>>>>>>>>>> trying to add a
>>>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>>>>>> comparing
>>>>>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>>>>>> information on
>>>>>>>>>>>> this soon.
>>>>>>>>>>>>
>>>>>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>>>>>> might want
>>>>>>>>>>>> to consider adding a native source/sink implementations. But
>>>>>>>>>>>> this will
>>>>>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>>>>>> will have
>>>>>>>>>>>> to have it's own source/sink implementation). So I think we
>>>>>>>>>>>> should try to
>>>>>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>>>>>> whenever
>>>>>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>>>>>> transforms
>>>>>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>>>>>> Python for
>>>>>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>>>>>> implement a
>>>>>>>>>>>> connector for a given SDK if there's good client library
>>>>>>>>>>>> support.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>>>>>> written in
>>>>>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>>>>>> important to be
>>>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK
>>>>>>>>>>>> (and any
>>>>>>>>>>>> other non-Java SDK). Especially in a fully managed environment
>>>>>>>>>>>> it should be
>>>>>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>>>>>> transparent. It
>>>>>>>>>>>> takes significant time and effort to mature connectors and I'm
>>>>>>>>>>>> not sure it
>>>>>>>>>>>> is realistic to repeat that for all external systems in
>>>>>>>>>>>> multiple languages.
>>>>>>>>>>>> Or, to put it in another way, it is likely that instead of one
>>>>>>>>>>>> over time
>>>>>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>>>>>> less mature
>>>>>>>>>>>> implementations. That's also the reason we internally want to
>>>>>>>>>>>> use the Flink
>>>>>>>>>>>> native connectors - we know what they can and cannot do and
>>>>>>>>>>>> want to
>>>>>>>>>>>> leverage the existing investment.
>>>>>>>>>>>>
>>>>>>>>>>>> There are two related issues here: how to specify transforms
>>>>>>>>>>>> (such as
>>>>>>>>>>>> sources) in a language-independent manner, and how specific
>>>>>>>>>>>> runners can
>>>>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
>>>>>>>>>>>> URNs: the
>>>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload
>>>>>>>>>>>> that fully
>>>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN
>>>>>>>>>>>> and
>>>>>>>>>>>> payload.) A runner that understands these URNs is free to make
>>>>>>>>>>>> any
>>>>>>>>>>>> (semantically-equivalent) substitutions it wants for this
>>>>>>>>>>>> transform.
>>>>>>>>>>>>
>>>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be
>>>>>>>>>>>> a DoFn that
>>>>>>>>>>>> fails loudly if it's actually called in the short term rather
>>>>>>>>>>>> than a full
>>>>>>>>>>>> Python implementation. Eventually, we would like to be able to
>>>>>>>>>>>> call out to
>>>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated
>>>>>>>>>>>> ones like
>>>>>>>>>>>> BigQueryIO).
>>>>>>>>>>>>
>>>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source
>>>>>>>>>>>> only
>>>>>>>>>>>> considers the message value and uses the byte coder that both
>>>>>>>>>>>> sides
>>>>>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>>>>>> metadata to
>>>>>>>>>>>> the Python transform (similar to KafkaRecord from Java
>>>>>>>>>>>> KafkaIO), then a
>>>>>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>>>>>> protobuf, Avro
>>>>>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>>>>>> Python SDK.
>>>>>>>>>>>>
>>>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>>>>>> rather any
>>>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>>>>>> agnostic
>>>>>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
>>>>>>>>>>>> payload), and
>>>>>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>>>>> This is not a closed set.
>>>>>>>>>>>>
>>>>>>>>>>>> - Robert
>>>>>>>>>>>>
>>>>>>>>>>>
>

Re: Custom URNs and runner translation

Posted by Thomas Weise <th...@apache.org>.
Thanks for all the feedback! I agree that the desirable state is to have
solid connector implementations for all common integration scenarios as
part of Beam. And it seems that the path there would be cross-language IO.

The ability to specify with URN and implement custom transforms is also
important. Such transforms may not qualify for inclusion in Beam for a
variety of reasons (only relevant for a specific environment or use case,
dependencies/licensing, ...).

For my specific experiment, I prefer the custom URN over trying to bend the
implementation to mimic an SDF based KafkaIO that it wouldn't (and doesn't
need to) be semantically equivalent to. At this point Beam doesn't have the
spec and implementation for said KafkaIO, but it would be great to see an
example how it would look like. Following a Beam spec would absolutely make
sense if the custom implementation is purely for optimization or similar
purpose.

I wanted to circle back to the coder related question. I see that we now
have a proto definition for the standard transforms and coders, which is
really nice:

https://github.com/apache/beam/blob/42fac771814b119c162d40e9300f5a0d3afe0f48/model/pipeline/src/main/proto/beam_runner_api.proto#L521

This enables interoperability between languages with some standard types
(KV, ITERABLE etc.), but for a structure like KafkaRecord a custom coder
would be required, implemented in both Java and Python. Any thoughts on
providing a generic tuple/record coder as part of the spec?

Thanks,
Thomas



On Fri, Apr 27, 2018 at 8:53 AM, Lukasz Cwik <lc...@google.com> wrote:

>
>
> On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> I agree with Thomas' sentiment that cross-language IO is very important
>>> because of how much work it takes to produce a mature connector
>>> implementation in a language. Looking at implementations of BigQueryIO,
>>> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
>>> to reimplement them entirely in Python and Go.
>>>
>>> I'm imagining pretty much what Kenn is describing: a pipeline would
>>> specify some transforms by URN + payload, and rely on the runner to do
>>> whatever it takes to run this - either by expanding it into a Beam
>>> implementation of this transform that the runner chooses to use (could be
>>> in the same language or in a different language; either way, the runner
>>> would indeed need to invoke the respective SDK to expand it given the
>>> parameters), or by doing something entirely runner-specific (e.g. using the
>>> built-in Flink Kafka connector).
>>>
>>> I don't see a reason to require that there *must* exist a Beam
>>> implementation of this transform. There only, ideally, must be a runner-
>>> and language-agnostic spec for the URN and payload; of course, then the
>>> transform is only as portable as the set of runners that implement this URN.
>>>
>>
>> For a transform in general it's true that we don't need a Beam
>> implementation, but more specifically for IOs I think there are many
>> benefits to having the implementation in Beam. For example,
>>
>>    - IO connector will offer same behavior and feature set across
>>    various runners/SDKs.
>>    - Beam community will be able to view/modify/improve the IO connector.
>>    - existing IO connectors will serve as examples for users who wish to
>>    develop new IO connectors
>>
>>
>>
>    - More runners will be able to execute the users pipeline.
>
>
>>> I actually really like the idea that the transform can be implemented in
>>> a completely runner-specific way without a Beam expansion to back it up -
>>> it would let us unblock a lot of the work earlier than full-blown
>>> cross-language IO is delivered or even than SDFs work in all
>>> languages/runners.
>>>
>>
>> If there are existing established connectors (for example, Kafka for
>> Flink in this case) I agree. But for anybody developing a new IO connector,
>> I think we should encourage developing that in Beam (in some SDK) given
>> that the connector will be available to all runners (and to all SDKs once
>> we have cross-language transforms).
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
>>>> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
>>>> URN + payload) maybe bogus contents. It is replaced with a small Flink
>>>> subgraph, including the native Flink Kafka connector and some compensating
>>>> transfoms to match the required semantics. To me, this is preferable to
>>>> making single-runner transform URNs, since that breaks runner portability
>>>> by definition.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The premise of URN + payload is that you can establish a spec. A
>>>>>>> native override still needs to meet the spec - it may still require some
>>>>>>> compensating code. Worrying about weird differences between runners seems
>>>>>>> more about worrying that an adequate spec cannot be determined.
>>>>>>>
>>>>>>
>>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>>>>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>>>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>>>>> share the same spec and should have a different URN.
>>>>>>
>>>>>
>>>>> Agree that if they cannot share the same spec, SDF and native
>>>>> transforms warrant different URNs. Native Kafka might be able to support a
>>>>> PCollection of topics/partitions as an input though by utilizing underlying
>>>>> native Flink Kafka connector as a library. On the other hand, we might
>>>>> decide to expand SDF based ParDos into to other transforms before a runner
>>>>> gets a chance to override in which case this kind of replacements will not
>>>>> be possible.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>>
>>>>>>> Runners will already invoke the SDF differently, so users treating
>>>>>>> every detail of some implementation as the spec are doomed.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>>>>> chamikara@google.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be
>>>>>>>>>> a DoFn that
>>>>>>>>>> > fails loudly if it's actually called in the short term rather
>>>>>>>>>> than a full
>>>>>>>>>> > Python implementation.
>>>>>>>>>>
>>>>>>>>>> For configurable runner-native IO, for now, I think it is
>>>>>>>>>> reasonable to use a URN + special data payload directly without a KafkaDoFn
>>>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for
>>>>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>>>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>>>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>>>>>>> where non-native implementations are not possible.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a map-task
>>>>>>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>>>>> runner will have a chance to perform this override before the SDF expansion
>>>>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>>>>>>> be less desirable from a Python user API perspective.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Are we sure that the internal SDF will provide the same
>>>>>>>> functionality as the native one? What if the Kafka SDF is in the middle of
>>>>>>>> a pipeline - can Flink support that? Having a separate transform for the
>>>>>>>> Flink native source might be a better user experience than having one that
>>>>>>>> changes its behavior in strange ways depending on the runner.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
>>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> > Hi Cham,
>>>>>>>>>>>
>>>>>>>>>>> > Thanks for the feedback!
>>>>>>>>>>>
>>>>>>>>>>> > I should have probably clarified that my POC and questions
>>>>>>>>>>> aren't
>>>>>>>>>>> specific to Kafka as source, but pretty much any other
>>>>>>>>>>> source/sink that we
>>>>>>>>>>> internally use as well. We have existing Flink pipelines that
>>>>>>>>>>> are written
>>>>>>>>>>> in Java and we want to use the same connectors with the Python
>>>>>>>>>>> SDK on top
>>>>>>>>>>> of the already operationalized Flink stack. Therefore,
>>>>>>>>>>> portability isn't a
>>>>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>>>>> implementations of established source/sinks can be can be desirable in some
>>>>>>>>> cases.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>> > -->
>>>>>>>>>>>
>>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>>>> > <ch...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> >> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>>>>
>>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom
>>>>>>>>>>> streaming source
>>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
>>>>>>>>>>> Flink runner.
>>>>>>>>>>>
>>>>>>>>>>> >>> We eventually want to use the same native Flink connectors
>>>>>>>>>>> for sources
>>>>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do
>>>>>>>>>>> you mean
>>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use
>>>>>>>>>>> the same
>>>>>>>>>>> Kafka connector implementation ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> > The native Flink sources as shown in the example below, not
>>>>>>>>>>> the Beam
>>>>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> >>> I got a simple example to work with the
>>>>>>>>>>> FlinkKafkaConsumer010 reading
>>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is
>>>>>>>>>>> here:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/tweise/beam/commit/
>>>>>>>>>>> 79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>>>>> particular:
>>>>>>>>>>>
>>>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>>>>> (custom
>>>>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>>>>> translations
>>>>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>>>>> option, ...)
>>>>>>>>>>>
>>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>>>>> recommended
>>>>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>>>>> reusable
>>>>>>>>>>> custom module that pipeline authors can import)? Also, the
>>>>>>>>>>> example does not
>>>>>>>>>>> have the configuration part covered yet..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> >> The only standard unbounded source API offered by Python SDK
>>>>>>>>>>> is the
>>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying
>>>>>>>>>>> to add a
>>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>>>>> comparing
>>>>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>>>>> information on
>>>>>>>>>>> this soon.
>>>>>>>>>>>
>>>>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>>>>> might want
>>>>>>>>>>> to consider adding a native source/sink implementations. But
>>>>>>>>>>> this will
>>>>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>>>>> will have
>>>>>>>>>>> to have it's own source/sink implementation). So I think we
>>>>>>>>>>> should try to
>>>>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>>>>> whenever
>>>>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>>>>> transforms
>>>>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>>>>> Python for
>>>>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>>>>> implement a
>>>>>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>>>>> written in
>>>>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>>>>> important to be
>>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK
>>>>>>>>>>> (and any
>>>>>>>>>>> other non-Java SDK). Especially in a fully managed environment
>>>>>>>>>>> it should be
>>>>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>>>>> transparent. It
>>>>>>>>>>> takes significant time and effort to mature connectors and I'm
>>>>>>>>>>> not sure it
>>>>>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>>>>>> languages.
>>>>>>>>>>> Or, to put it in another way, it is likely that instead of one
>>>>>>>>>>> over time
>>>>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>>>>> less mature
>>>>>>>>>>> implementations. That's also the reason we internally want to
>>>>>>>>>>> use the Flink
>>>>>>>>>>> native connectors - we know what they can and cannot do and want
>>>>>>>>>>> to
>>>>>>>>>>> leverage the existing investment.
>>>>>>>>>>>
>>>>>>>>>>> There are two related issues here: how to specify transforms
>>>>>>>>>>> (such as
>>>>>>>>>>> sources) in a language-independent manner, and how specific
>>>>>>>>>>> runners can
>>>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
>>>>>>>>>>> URNs: the
>>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload
>>>>>>>>>>> that fully
>>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN
>>>>>>>>>>> and
>>>>>>>>>>> payload.) A runner that understands these URNs is free to make
>>>>>>>>>>> any
>>>>>>>>>>> (semantically-equivalent) substitutions it wants for this
>>>>>>>>>>> transform.
>>>>>>>>>>>
>>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>>>>> DoFn that
>>>>>>>>>>> fails loudly if it's actually called in the short term rather
>>>>>>>>>>> than a full
>>>>>>>>>>> Python implementation. Eventually, we would like to be able to
>>>>>>>>>>> call out to
>>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated
>>>>>>>>>>> ones like
>>>>>>>>>>> BigQueryIO).
>>>>>>>>>>>
>>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source
>>>>>>>>>>> only
>>>>>>>>>>> considers the message value and uses the byte coder that both
>>>>>>>>>>> sides
>>>>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>>>>> metadata to
>>>>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>>>>>> then a
>>>>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>>>>> protobuf, Avro
>>>>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>>>>> Python SDK.
>>>>>>>>>>>
>>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>>>>> rather any
>>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>>>>> agnostic
>>>>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>>>>
>>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
>>>>>>>>>>> payload), and
>>>>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>>>> https://github.com/apache/beam/blob/master/model/
>>>>>>>>>>> pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>>>> This is not a closed set.
>>>>>>>>>>>
>>>>>>>>>>> - Robert
>>>>>>>>>>>
>>>>>>>>>>

Re: Custom URNs and runner translation

Posted by Lukasz Cwik <lc...@google.com>.
On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> I agree with Thomas' sentiment that cross-language IO is very important
>> because of how much work it takes to produce a mature connector
>> implementation in a language. Looking at implementations of BigQueryIO,
>> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
>> to reimplement them entirely in Python and Go.
>>
>> I'm imagining pretty much what Kenn is describing: a pipeline would
>> specify some transforms by URN + payload, and rely on the runner to do
>> whatever it takes to run this - either by expanding it into a Beam
>> implementation of this transform that the runner chooses to use (could be
>> in the same language or in a different language; either way, the runner
>> would indeed need to invoke the respective SDK to expand it given the
>> parameters), or by doing something entirely runner-specific (e.g. using the
>> built-in Flink Kafka connector).
>>
>> I don't see a reason to require that there *must* exist a Beam
>> implementation of this transform. There only, ideally, must be a runner-
>> and language-agnostic spec for the URN and payload; of course, then the
>> transform is only as portable as the set of runners that implement this URN.
>>
>
> For a transform in general it's true that we don't need a Beam
> implementation, but more specifically for IOs I think there are many
> benefits to having the implementation in Beam. For example,
>
>    - IO connector will offer same behavior and feature set across various
>    runners/SDKs.
>    - Beam community will be able to view/modify/improve the IO connector.
>    - existing IO connectors will serve as examples for users who wish to
>    develop new IO connectors
>
>
>
   - More runners will be able to execute the users pipeline.


>> I actually really like the idea that the transform can be implemented in
>> a completely runner-specific way without a Beam expansion to back it up -
>> it would let us unblock a lot of the work earlier than full-blown
>> cross-language IO is delivered or even than SDFs work in all
>> languages/runners.
>>
>
> If there are existing established connectors (for example, Kafka for Flink
> in this case) I agree. But for anybody developing a new IO connector, I
> think we should encourage developing that in Beam (in some SDK) given that
> the connector will be available to all runners (and to all SDKs once we
> have cross-language transforms).
>
> Thanks,
> Cham
>
>
>>
>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
>>> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
>>> URN + payload) maybe bogus contents. It is replaced with a small Flink
>>> subgraph, including the native Flink Kafka connector and some compensating
>>> transfoms to match the required semantics. To me, this is preferable to
>>> making single-runner transform URNs, since that breaks runner portability
>>> by definition.
>>>
>>> Kenn
>>>
>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com>
>>>>> wrote:
>>>>>
>>>>>> The premise of URN + payload is that you can establish a spec. A
>>>>>> native override still needs to meet the spec - it may still require some
>>>>>> compensating code. Worrying about weird differences between runners seems
>>>>>> more about worrying that an adequate spec cannot be determined.
>>>>>>
>>>>>
>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>>>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>>>> share the same spec and should have a different URN.
>>>>>
>>>>
>>>> Agree that if they cannot share the same spec, SDF and native
>>>> transforms warrant different URNs. Native Kafka might be able to support a
>>>> PCollection of topics/partitions as an input though by utilizing underlying
>>>> native Flink Kafka connector as a library. On the other hand, we might
>>>> decide to expand SDF based ParDos into to other transforms before a runner
>>>> gets a chance to override in which case this kind of replacements will not
>>>> be possible.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>>
>>>>>> Runners will already invoke the SDF differently, so users treating
>>>>>> every detail of some implementation as the spec are doomed.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>>>> chamikara@google.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be
>>>>>>>>> a DoFn that
>>>>>>>>> > fails loudly if it's actually called in the short term rather
>>>>>>>>> than a full
>>>>>>>>> > Python implementation.
>>>>>>>>>
>>>>>>>>> For configurable runner-native IO, for now, I think it is
>>>>>>>>> reasonable to use a URN + special data payload directly without a KafkaDoFn
>>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for
>>>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>>>>>> where non-native implementations are not possible.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a map-task
>>>>>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>>>> runner will have a chance to perform this override before the SDF expansion
>>>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>>>>>> be less desirable from a Python user API perspective.
>>>>>>>>
>>>>>>>
>>>>>>> Are we sure that the internal SDF will provide the same
>>>>>>> functionality as the native one? What if the Kafka SDF is in the middle of
>>>>>>> a pipeline - can Flink support that? Having a separate transform for the
>>>>>>> Flink native source might be a better user experience than having one that
>>>>>>> changes its behavior in strange ways depending on the runner.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> > Hi Cham,
>>>>>>>>>>
>>>>>>>>>> > Thanks for the feedback!
>>>>>>>>>>
>>>>>>>>>> > I should have probably clarified that my POC and questions
>>>>>>>>>> aren't
>>>>>>>>>> specific to Kafka as source, but pretty much any other
>>>>>>>>>> source/sink that we
>>>>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>>>>> written
>>>>>>>>>> in Java and we want to use the same connectors with the Python
>>>>>>>>>> SDK on top
>>>>>>>>>> of the already operationalized Flink stack. Therefore,
>>>>>>>>>> portability isn't a
>>>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>>>> implementations of established source/sinks can be can be desirable in some
>>>>>>>> cases.
>>>>>>>>
>>>>>>>>
>>>>>>>>>> > -->
>>>>>>>>>>
>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>>> > <ch...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> >> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>>>
>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>>>>> source
>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
>>>>>>>>>> Flink runner.
>>>>>>>>>>
>>>>>>>>>> >>> We eventually want to use the same native Flink connectors
>>>>>>>>>> for sources
>>>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do
>>>>>>>>>> you mean
>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>>>>> same
>>>>>>>>>> Kafka connector implementation ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>>>>> Beam
>>>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>>>>> reading
>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is
>>>>>>>>>> here:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>>>> particular:
>>>>>>>>>>
>>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>>>> (custom
>>>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>>>> translations
>>>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>>>> option, ...)
>>>>>>>>>>
>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>>>> recommended
>>>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>>>> reusable
>>>>>>>>>> custom module that pipeline authors can import)? Also, the
>>>>>>>>>> example does not
>>>>>>>>>> have the configuration part covered yet..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >> The only standard unbounded source API offered by Python SDK
>>>>>>>>>> is the
>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying
>>>>>>>>>> to add a
>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>>>> comparing
>>>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>>>> information on
>>>>>>>>>> this soon.
>>>>>>>>>>
>>>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>>>> might want
>>>>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>>>>> will
>>>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>>>> will have
>>>>>>>>>> to have it's own source/sink implementation). So I think we
>>>>>>>>>> should try to
>>>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>>>> whenever
>>>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>>>> transforms
>>>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>>>> Python for
>>>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>>>> implement a
>>>>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>>>> written in
>>>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>>>> important to be
>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK
>>>>>>>>>> (and any
>>>>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>>>>> should be
>>>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>>>> transparent. It
>>>>>>>>>> takes significant time and effort to mature connectors and I'm
>>>>>>>>>> not sure it
>>>>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>>>>> languages.
>>>>>>>>>> Or, to put it in another way, it is likely that instead of one
>>>>>>>>>> over time
>>>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>>>> less mature
>>>>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>>>>> the Flink
>>>>>>>>>> native connectors - we know what they can and cannot do and want
>>>>>>>>>> to
>>>>>>>>>> leverage the existing investment.
>>>>>>>>>>
>>>>>>>>>> There are two related issues here: how to specify transforms
>>>>>>>>>> (such as
>>>>>>>>>> sources) in a language-independent manner, and how specific
>>>>>>>>>> runners can
>>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
>>>>>>>>>> URNs: the
>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>>>>> fully
>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>>>>> (semantically-equivalent) substitutions it wants for this
>>>>>>>>>> transform.
>>>>>>>>>>
>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>>>> DoFn that
>>>>>>>>>> fails loudly if it's actually called in the short term rather
>>>>>>>>>> than a full
>>>>>>>>>> Python implementation. Eventually, we would like to be able to
>>>>>>>>>> call out to
>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>>>>> like
>>>>>>>>>> BigQueryIO).
>>>>>>>>>>
>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>>>>> considers the message value and uses the byte coder that both
>>>>>>>>>> sides
>>>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>>>> metadata to
>>>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>>>>> then a
>>>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>>>> protobuf, Avro
>>>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>>>> Python SDK.
>>>>>>>>>>
>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>>>> rather any
>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>>>> agnostic
>>>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>>>
>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
>>>>>>>>>> payload), and
>>>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>>> This is not a closed set.
>>>>>>>>>>
>>>>>>>>>> - Robert
>>>>>>>>>>
>>>>>>>>>

Re: Custom URNs and runner translation

Posted by Chamikara Jayalath <ch...@google.com>.
On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <ki...@google.com>
wrote:

> I agree with Thomas' sentiment that cross-language IO is very important
> because of how much work it takes to produce a mature connector
> implementation in a language. Looking at implementations of BigQueryIO,
> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
> to reimplement them entirely in Python and Go.
>
> I'm imagining pretty much what Kenn is describing: a pipeline would
> specify some transforms by URN + payload, and rely on the runner to do
> whatever it takes to run this - either by expanding it into a Beam
> implementation of this transform that the runner chooses to use (could be
> in the same language or in a different language; either way, the runner
> would indeed need to invoke the respective SDK to expand it given the
> parameters), or by doing something entirely runner-specific (e.g. using the
> built-in Flink Kafka connector).
>
> I don't see a reason to require that there *must* exist a Beam
> implementation of this transform. There only, ideally, must be a runner-
> and language-agnostic spec for the URN and payload; of course, then the
> transform is only as portable as the set of runners that implement this URN.
>

For a transform in general it's true that we don't need a Beam
implementation, but more specifically for IOs I think there are many
benefits to having the implementation in Beam. For example,

   - IO connector will offer same behavior and feature set across various
   runners/SDKs.
   - Beam community will be able to view/modify/improve the IO connector.
   - existing IO connectors will serve as examples for users who wish to
   develop new IO connectors



> I actually really like the idea that the transform can be implemented in a
> completely runner-specific way without a Beam expansion to back it up - it
> would let us unblock a lot of the work earlier than full-blown
> cross-language IO is delivered or even than SDFs work in all
> languages/runners.
>

If there are existing established connectors (for example, Kafka for Flink
in this case) I agree. But for anybody developing a new IO connector, I
think we should encourage developing that in Beam (in some SDK) given that
the connector will be available to all runners (and to all SDKs once we
have cross-language transforms).

Thanks,
Cham


>
> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <kl...@google.com> wrote:
>
>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
>> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
>> URN + payload) maybe bogus contents. It is replaced with a small Flink
>> subgraph, including the native Flink Kafka connector and some compensating
>> transfoms to match the required semantics. To me, this is preferable to
>> making single-runner transform URNs, since that breaks runner portability
>> by definition.
>>
>> Kenn
>>
>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>
>>>>> The premise of URN + payload is that you can establish a spec. A
>>>>> native override still needs to meet the spec - it may still require some
>>>>> compensating code. Worrying about weird differences between runners seems
>>>>> more about worrying that an adequate spec cannot be determined.
>>>>>
>>>>
>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>>> share the same spec and should have a different URN.
>>>>
>>>
>>> Agree that if they cannot share the same spec, SDF and native transforms
>>> warrant different URNs. Native Kafka might be able to support a PCollection
>>> of topics/partitions as an input though by utilizing underlying native
>>> Flink Kafka connector as a library. On the other hand, we might decide to
>>> expand SDF based ParDos into to other transforms before a runner gets a
>>> chance to override in which case this kind of replacements will not be
>>> possible.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>>> Runners will already invoke the SDF differently, so users treating
>>>>> every detail of some implementation as the spec are doomed.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>> DoFn that
>>>>>>>> > fails loudly if it's actually called in the short term rather
>>>>>>>> than a full
>>>>>>>> > Python implementation.
>>>>>>>>
>>>>>>>> For configurable runner-native IO, for now, I think it is
>>>>>>>> reasonable to use a URN + special data payload directly without a KafkaDoFn
>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for
>>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>>>>> where non-native implementations are not possible.
>>>>>>>>
>>>>>>>
>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a map-task
>>>>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>>> runner will have a chance to perform this override before the SDF expansion
>>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>>>>> be less desirable from a Python user API perspective.
>>>>>>>
>>>>>>
>>>>>> Are we sure that the internal SDF will provide the same functionality
>>>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline -
>>>>>> can Flink support that? Having a separate transform for the Flink native
>>>>>> source might be a better user experience than having one that changes its
>>>>>> behavior in strange ways depending on the runner.
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> > Hi Cham,
>>>>>>>>>
>>>>>>>>> > Thanks for the feedback!
>>>>>>>>>
>>>>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>>>>> that we
>>>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>>>> written
>>>>>>>>> in Java and we want to use the same connectors with the Python SDK
>>>>>>>>> on top
>>>>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>>>>> isn't a
>>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>>
>>>>>>>>
>>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>>> implementations of established source/sinks can be can be desirable in some
>>>>>>> cases.
>>>>>>>
>>>>>>>
>>>>>>>>> > -->
>>>>>>>>>
>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>> > <ch...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> >> Hi Thomas,
>>>>>>>>>
>>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>>
>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>>>> source
>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
>>>>>>>>> Flink runner.
>>>>>>>>>
>>>>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>>>>> sources
>>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do
>>>>>>>>> you mean
>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>>>> same
>>>>>>>>> Kafka connector implementation ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>>>> Beam
>>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>>>> reading
>>>>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>>> particular:
>>>>>>>>>
>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>>> (custom
>>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>>> translations
>>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>>> option, ...)
>>>>>>>>>
>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>>> recommended
>>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>>> reusable
>>>>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>>>>> does not
>>>>>>>>> have the configuration part covered yet..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> The only standard unbounded source API offered by Python SDK is
>>>>>>>>> the
>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying
>>>>>>>>> to add a
>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>>> comparing
>>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>>> information on
>>>>>>>>> this soon.
>>>>>>>>>
>>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>>> might want
>>>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>>>> will
>>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>>> will have
>>>>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>>>>> try to
>>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>>> whenever
>>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>>> transforms
>>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>>> Python for
>>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>>> implement a
>>>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>>> written in
>>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>>> important to be
>>>>>>>>> able to use at least the Java Beam connectors with Python SDK (and
>>>>>>>>> any
>>>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>>>> should be
>>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>>> transparent. It
>>>>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>>>>> sure it
>>>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>>>> languages.
>>>>>>>>> Or, to put it in another way, it is likely that instead of one
>>>>>>>>> over time
>>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>>> less mature
>>>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>>>> the Flink
>>>>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>>>>> leverage the existing investment.
>>>>>>>>>
>>>>>>>>> There are two related issues here: how to specify transforms (such
>>>>>>>>> as
>>>>>>>>> sources) in a language-independent manner, and how specific
>>>>>>>>> runners can
>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
>>>>>>>>> URNs: the
>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>>>> fully
>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>>>> (semantically-equivalent) substitutions it wants for this
>>>>>>>>> transform.
>>>>>>>>>
>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>>> DoFn that
>>>>>>>>> fails loudly if it's actually called in the short term rather than
>>>>>>>>> a full
>>>>>>>>> Python implementation. Eventually, we would like to be able to
>>>>>>>>> call out to
>>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>>>> like
>>>>>>>>> BigQueryIO).
>>>>>>>>>
>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>>>> considers the message value and uses the byte coder that both sides
>>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>>> metadata to
>>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>>>> then a
>>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>>> protobuf, Avro
>>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>>> Python SDK.
>>>>>>>>>
>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>>> rather any
>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>>> agnostic
>>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>>
>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
>>>>>>>>> payload), and
>>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>> This is not a closed set.
>>>>>>>>>
>>>>>>>>> - Robert
>>>>>>>>>
>>>>>>>>

Re: Custom URNs and runner translation

Posted by Eugene Kirpichov <ki...@google.com>.
I agree with Thomas' sentiment that cross-language IO is very important
because of how much work it takes to produce a mature connector
implementation in a language. Looking at implementations of BigQueryIO,
PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
to reimplement them entirely in Python and Go.

I'm imagining pretty much what Kenn is describing: a pipeline would specify
some transforms by URN + payload, and rely on the runner to do whatever it
takes to run this - either by expanding it into a Beam implementation of
this transform that the runner chooses to use (could be in the same
language or in a different language; either way, the runner would indeed
need to invoke the respective SDK to expand it given the parameters), or by
doing something entirely runner-specific (e.g. using the built-in Flink
Kafka connector).

I don't see a reason to require that there *must* exist a Beam
implementation of this transform. There only, ideally, must be a runner-
and language-agnostic spec for the URN and payload; of course, then the
transform is only as portable as the set of runners that implement this URN.

I actually really like the idea that the transform can be implemented in a
completely runner-specific way without a Beam expansion to back it up - it
would let us unblock a lot of the work earlier than full-blown
cross-language IO is delivered or even than SDFs work in all
languages/runners.

On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <kl...@google.com> wrote:

> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
> URN + payload) maybe bogus contents. It is replaced with a small Flink
> subgraph, including the native Flink Kafka connector and some compensating
> transfoms to match the required semantics. To me, this is preferable to
> making single-runner transform URNs, since that breaks runner portability
> by definition.
>
> Kenn
>
> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>>
>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> The premise of URN + payload is that you can establish a spec. A native
>>>> override still needs to meet the spec - it may still require some
>>>> compensating code. Worrying about weird differences between runners seems
>>>> more about worrying that an adequate spec cannot be determined.
>>>>
>>>
>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>> share the same spec and should have a different URN.
>>>
>>
>> Agree that if they cannot share the same spec, SDF and native transforms
>> warrant different URNs. Native Kafka might be able to support a PCollection
>> of topics/partitions as an input though by utilizing underlying native
>> Flink Kafka connector as a library. On the other hand, we might decide to
>> expand SDF based ParDos into to other transforms before a runner gets a
>> chance to override in which case this kind of replacements will not be
>> possible.
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>>> Runners will already invoke the SDF differently, so users treating
>>>> every detail of some implementation as the spec are doomed.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>> DoFn that
>>>>>>> > fails loudly if it's actually called in the short term rather than
>>>>>>> a full
>>>>>>> > Python implementation.
>>>>>>>
>>>>>>> For configurable runner-native IO, for now, I think it is reasonable
>>>>>>> to use a URN + special data payload directly without a KafkaDoFn --
>>>>>>> assuming it's a portable pipeline. That's what we do in Go for
>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>>>> where non-native implementations are not possible.
>>>>>>>
>>>>>>
>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>> similar to any other SDF by default (initial splitting, GBK, and a map-task
>>>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>> runner will have a chance to perform this override before the SDF expansion
>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>>>> be less desirable from a Python user API perspective.
>>>>>>
>>>>>
>>>>> Are we sure that the internal SDF will provide the same functionality
>>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline -
>>>>> can Flink support that? Having a separate transform for the Flink native
>>>>> source might be a better user experience than having one that changes its
>>>>> behavior in strange ways depending on the runner.
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > Hi Cham,
>>>>>>>>
>>>>>>>> > Thanks for the feedback!
>>>>>>>>
>>>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>>>> that we
>>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>>> written
>>>>>>>> in Java and we want to use the same connectors with the Python SDK
>>>>>>>> on top
>>>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>>>> isn't a
>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>
>>>>>>>
>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>> implementations of established source/sinks can be can be desirable in some
>>>>>> cases.
>>>>>>
>>>>>>
>>>>>>>> > -->
>>>>>>>>
>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>> > <ch...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> >> Hi Thomas,
>>>>>>>>
>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>
>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>>> source
>>>>>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>>>>>> runner.
>>>>>>>>
>>>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>>>> sources
>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>
>>>>>>>>
>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you
>>>>>>>> mean
>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>>> same
>>>>>>>> Kafka connector implementation ?
>>>>>>>>
>>>>>>>>
>>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>>> Beam
>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>>> reading
>>>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>> particular:
>>>>>>>>
>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>> (custom
>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>> translations
>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>> option, ...)
>>>>>>>>
>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>> recommended
>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>> reusable
>>>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>>>> does not
>>>>>>>> have the configuration part covered yet..
>>>>>>>>
>>>>>>>>
>>>>>>>> >> The only standard unbounded source API offered by Python SDK is
>>>>>>>> the
>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to
>>>>>>>> add a
>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>> comparing
>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>> information on
>>>>>>>> this soon.
>>>>>>>>
>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>> might want
>>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>>> will
>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>> will have
>>>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>>>> try to
>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>> whenever
>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>> transforms
>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>> Python for
>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>> implement a
>>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>>
>>>>>>>>
>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>> written in
>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>> important to be
>>>>>>>> able to use at least the Java Beam connectors with Python SDK (and
>>>>>>>> any
>>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>>> should be
>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>> transparent. It
>>>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>>>> sure it
>>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>>> languages.
>>>>>>>> Or, to put it in another way, it is likely that instead of one over
>>>>>>>> time
>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>> less mature
>>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>>> the Flink
>>>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>>>> leverage the existing investment.
>>>>>>>>
>>>>>>>> There are two related issues here: how to specify transforms (such
>>>>>>>> as
>>>>>>>> sources) in a language-independent manner, and how specific runners
>>>>>>>> can
>>>>>>>> recognize and run them, but URNs solve both. For  this we use URNs:
>>>>>>>> the
>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>>> fully
>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>>>>>
>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>> DoFn that
>>>>>>>> fails loudly if it's actually called in the short term rather than
>>>>>>>> a full
>>>>>>>> Python implementation. Eventually, we would like to be able to call
>>>>>>>> out to
>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>>> like
>>>>>>>> BigQueryIO).
>>>>>>>>
>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>>> considers the message value and uses the byte coder that both sides
>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>> metadata to
>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>>> then a
>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>> protobuf, Avro
>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>
>>>>>>>>
>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>> Python SDK.
>>>>>>>>
>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>> rather any
>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>> agnostic
>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>
>>>>>>>> Yes, Coders are also specified by URN (+components and/or payload),
>>>>>>>> and
>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>> This is not a closed set.
>>>>>>>>
>>>>>>>> - Robert
>>>>>>>>
>>>>>>>

Re: Custom URNs and runner translation

Posted by Kenneth Knowles <kl...@google.com>.
It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
URN + payload) maybe bogus contents. It is replaced with a small Flink
subgraph, including the native Flink Kafka connector and some compensating
transfoms to match the required semantics. To me, this is preferable to
making single-runner transform URNs, since that breaks runner portability
by definition.

Kenn

On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>
>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> The premise of URN + payload is that you can establish a spec. A native
>>> override still needs to meet the spec - it may still require some
>>> compensating code. Worrying about weird differences between runners seems
>>> more about worrying that an adequate spec cannot be determined.
>>>
>>
>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>> KafkaIO run after that on this dynamic (not known until runtime) list of
>> topics. If the native Flink source doesn't work this way, then it doesn't
>> share the same spec and should have a different URN.
>>
>
> Agree that if they cannot share the same spec, SDF and native transforms
> warrant different URNs. Native Kafka might be able to support a PCollection
> of topics/partitions as an input though by utilizing underlying native
> Flink Kafka connector as a library. On the other hand, we might decide to
> expand SDF based ParDos into to other transforms before a runner gets a
> chance to override in which case this kind of replacements will not be
> possible.
>
> Thanks,
> Cham
>
>
>>
>>> Runners will already invoke the SDF differently, so users treating every
>>> detail of some implementation as the spec are doomed.
>>>
>>> Kenn
>>>
>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>>>> wrote:
>>>>>
>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>> DoFn that
>>>>>> > fails loudly if it's actually called in the short term rather than
>>>>>> a full
>>>>>> > Python implementation.
>>>>>>
>>>>>> For configurable runner-native IO, for now, I think it is reasonable
>>>>>> to use a URN + special data payload directly without a KafkaDoFn --
>>>>>> assuming it's a portable pipeline. That's what we do in Go for
>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>>> where non-native implementations are not possible.
>>>>>>
>>>>>
>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>> similar to any other SDF by default (initial splitting, GBK, and a map-task
>>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>>> override it with an runner-native implementation if desired. I assume
>>>>> runner will have a chance to perform this override before the SDF expansion
>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>>> be less desirable from a Python user API perspective.
>>>>>
>>>>
>>>> Are we sure that the internal SDF will provide the same functionality
>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline -
>>>> can Flink support that? Having a separate transform for the Flink native
>>>> source might be a better user experience than having one that changes its
>>>> behavior in strange ways depending on the runner.
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:
>>>>>>>
>>>>>>> > Hi Cham,
>>>>>>>
>>>>>>> > Thanks for the feedback!
>>>>>>>
>>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>>> that we
>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>> written
>>>>>>> in Java and we want to use the same connectors with the Python SDK
>>>>>>> on top
>>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>>> isn't a
>>>>>>> concern as much as the ability to integrate is.
>>>>>>>
>>>>>>
>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>> implementations of established source/sinks can be can be desirable in some
>>>>> cases.
>>>>>
>>>>>
>>>>>>> > -->
>>>>>>>
>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>> > <ch...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> >> Hi Thomas,
>>>>>>>
>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>
>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>> source
>>>>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>>>>> runner.
>>>>>>>
>>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>>> sources
>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>
>>>>>>>
>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you
>>>>>>> mean
>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>> same
>>>>>>> Kafka connector implementation ?
>>>>>>>
>>>>>>>
>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>> Beam
>>>>>>> KafkaIO or other Beam sources.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>> reading
>>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>> particular:
>>>>>>>
>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>> (custom
>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>> translations
>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>> option, ...)
>>>>>>>
>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>> recommended
>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>> reusable
>>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>>> does not
>>>>>>> have the configuration part covered yet..
>>>>>>>
>>>>>>>
>>>>>>> >> The only standard unbounded source API offered by Python SDK is
>>>>>>> the
>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to
>>>>>>> add a
>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>> comparing
>>>>>>> different Kafka Python client libraries. Will share more information
>>>>>>> on
>>>>>>> this soon.
>>>>>>>
>>>>>>> >> I understand this might not be possible in all cases and we might
>>>>>>> want
>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>> will
>>>>>>> result in the implementation being runner-specific (each runner will
>>>>>>> have
>>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>>> try to
>>>>>>> add connector implementations to Beam using the standard API whenever
>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>> transforms
>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>> Python for
>>>>>>> example) but we are not there yet and we might still want to
>>>>>>> implement a
>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>
>>>>>>>
>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>> written in
>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>> important to be
>>>>>>> able to use at least the Java Beam connectors with Python SDK (and
>>>>>>> any
>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>> should be
>>>>>>> possible to offer this to users in a way that is largely
>>>>>>> transparent. It
>>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>>> sure it
>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>> languages.
>>>>>>> Or, to put it in another way, it is likely that instead of one over
>>>>>>> time
>>>>>>> rock solid connector per external system there will be multiple less
>>>>>>> mature
>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>> the Flink
>>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>>> leverage the existing investment.
>>>>>>>
>>>>>>> There are two related issues here: how to specify transforms (such as
>>>>>>> sources) in a language-independent manner, and how specific runners
>>>>>>> can
>>>>>>> recognize and run them, but URNs solve both. For  this we use URNs:
>>>>>>> the
>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>> fully
>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>>>>
>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>> DoFn that
>>>>>>> fails loudly if it's actually called in the short term rather than a
>>>>>>> full
>>>>>>> Python implementation. Eventually, we would like to be able to call
>>>>>>> out to
>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>> like
>>>>>>> BigQueryIO).
>>>>>>>
>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>> considers the message value and uses the byte coder that both sides
>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>> metadata to
>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>> then a
>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>> protobuf, Avro
>>>>>>> etc, but it would also need to be registered.
>>>>>>>
>>>>>>>
>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>> Python SDK.
>>>>>>>
>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or rather
>>>>>>> any
>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>> agnostic
>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>
>>>>>>> Yes, Coders are also specified by URN (+components and/or payload),
>>>>>>> and
>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>> This is not a closed set.
>>>>>>>
>>>>>>> - Robert
>>>>>>>
>>>>>>

Re: Custom URNs and runner translation

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:

> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com> wrote:
>
>> The premise of URN + payload is that you can establish a spec. A native
>> override still needs to meet the spec - it may still require some
>> compensating code. Worrying about weird differences between runners seems
>> more about worrying that an adequate spec cannot be determined.
>>
>
> My point exactly. a SDF-based KafkaIO can run in the middle of a pipeline.
> E.g. we could have TextIO producing a list of topics, and the SDF KafkaIO
> run after that on this dynamic (not known until runtime) list of topics. If
> the native Flink source doesn't work this way, then it doesn't share the
> same spec and should have a different URN.
>

Agree that if they cannot share the same spec, SDF and native transforms
warrant different URNs. Native Kafka might be able to support a PCollection
of topics/partitions as an input though by utilizing underlying native
Flink Kafka connector as a library. On the other hand, we might decide to
expand SDF based ParDos into to other transforms before a runner gets a
chance to override in which case this kind of replacements will not be
possible.

Thanks,
Cham


>
>> Runners will already invoke the SDF differently, so users treating every
>> detail of some implementation as the spec are doomed.
>>
>> Kenn
>>
>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>>> wrote:
>>>>
>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>> DoFn that
>>>>> > fails loudly if it's actually called in the short term rather than a
>>>>> full
>>>>> > Python implementation.
>>>>>
>>>>> For configurable runner-native IO, for now, I think it is reasonable
>>>>> to use a URN + special data payload directly without a KafkaDoFn --
>>>>> assuming it's a portable pipeline. That's what we do in Go for
>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>> where non-native implementations are not possible.
>>>>>
>>>>
>>>> I think, in this case, KafkaDoFn can be a SDF that would expand similar
>>>> to any other SDF by default (initial splitting, GBK, and a map-task
>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>> override it with an runner-native implementation if desired. I assume
>>>> runner will have a chance to perform this override before the SDF expansion
>>>> (which is not fully designed yet). Providing a separate source/sink
>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>> be less desirable from a Python user API perspective.
>>>>
>>>
>>> Are we sure that the internal SDF will provide the same functionality as
>>> the native one? What if the Kafka SDF is in the middle of a pipeline - can
>>> Flink support that? Having a separate transform for the Flink native source
>>> might be a better user experience than having one that changes its behavior
>>> in strange ways depending on the runner.
>>>
>>>
>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>> > Hi Cham,
>>>>>>
>>>>>> > Thanks for the feedback!
>>>>>>
>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>> that we
>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>> written
>>>>>> in Java and we want to use the same connectors with the Python SDK on
>>>>>> top
>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>> isn't a
>>>>>> concern as much as the ability to integrate is.
>>>>>>
>>>>>
>>>> Thanks for the clarification. Agree that providing runner-native
>>>> implementations of established source/sinks can be can be desirable in some
>>>> cases.
>>>>
>>>>
>>>>>> > -->
>>>>>>
>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>> > <ch...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>> >> Hi Thomas,
>>>>>>
>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>
>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>> source
>>>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>>>> runner.
>>>>>>
>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>> sources
>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>
>>>>>>
>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you
>>>>>> mean
>>>>>> that Beam-based and non-Beam-based versions of Flink will use the same
>>>>>> Kafka connector implementation ?
>>>>>>
>>>>>>
>>>>>> > The native Flink sources as shown in the example below, not the Beam
>>>>>> KafkaIO or other Beam sources.
>>>>>>
>>>>>>
>>>>>>
>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>> reading
>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>
>>>>>>
>>>>>>
>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>> particular:
>>>>>>
>>>>>> >>> * Enabling custom translation on the Flink portable runner (custom
>>>>>> translator could be loaded with ServiceLoader, additional translations
>>>>>> could also be specified as job server configuration, pipeline option,
>>>>>> ...)
>>>>>>
>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>> recommended
>>>>>> way to define a custom transform (it would eventually live in a
>>>>>> reusable
>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>> does not
>>>>>> have the configuration part covered yet..
>>>>>>
>>>>>>
>>>>>> >> The only standard unbounded source API offered by Python SDK is the
>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to
>>>>>> add a
>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>> comparing
>>>>>> different Kafka Python client libraries. Will share more information
>>>>>> on
>>>>>> this soon.
>>>>>>
>>>>>> >> I understand this might not be possible in all cases and we might
>>>>>> want
>>>>>> to consider adding a native source/sink implementations. But this will
>>>>>> result in the implementation being runner-specific (each runner will
>>>>>> have
>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>> try to
>>>>>> add connector implementations to Beam using the standard API whenever
>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>> transforms
>>>>>> in the future (so that we can utilize Java implementation from Python
>>>>>> for
>>>>>> example) but we are not there yet and we might still want to
>>>>>> implement a
>>>>>> connector for a given SDK if there's good client library support.
>>>>>>
>>>>>>
>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>> written in
>>>>>> Python in the future, but I think it is equally if not more important
>>>>>> to be
>>>>>> able to use at least the Java Beam connectors with Python SDK (and any
>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>> should be
>>>>>> possible to offer this to users in a way that is largely transparent.
>>>>>> It
>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>> sure it
>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>> languages.
>>>>>> Or, to put it in another way, it is likely that instead of one over
>>>>>> time
>>>>>> rock solid connector per external system there will be multiple less
>>>>>> mature
>>>>>> implementations. That's also the reason we internally want to use the
>>>>>> Flink
>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>> leverage the existing investment.
>>>>>>
>>>>>> There are two related issues here: how to specify transforms (such as
>>>>>> sources) in a language-independent manner, and how specific runners
>>>>>> can
>>>>>> recognize and run them, but URNs solve both. For  this we use URNs:
>>>>>> the
>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>> fully
>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>>>
>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>>>>> that
>>>>>> fails loudly if it's actually called in the short term rather than a
>>>>>> full
>>>>>> Python implementation. Eventually, we would like to be able to call
>>>>>> out to
>>>>>> another SDK to expand full transforms (e.g. more complicated ones like
>>>>>> BigQueryIO).
>>>>>>
>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>> considers the message value and uses the byte coder that both sides
>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>> metadata to
>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then
>>>>>> a
>>>>>> specific coder is needed. Such coder could be written using protobuf,
>>>>>> Avro
>>>>>> etc, but it would also need to be registered.
>>>>>>
>>>>>>
>>>>>> >> I think this requirement goes away if we implement Kafka in Python
>>>>>> SDK.
>>>>>>
>>>>>> > Wouldn't this be needed for any cross language pipeline? Or rather
>>>>>> any
>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>> agnostic
>>>>>> encoding for KV<?,?>, for example?
>>>>>>
>>>>>> Yes, Coders are also specified by URN (+components and/or payload),
>>>>>> and
>>>>>> there are a couple of standard ones, including KV. See
>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>> This is not a closed set.
>>>>>>
>>>>>> - Robert
>>>>>>
>>>>>

Re: Custom URNs and runner translation

Posted by Reuven Lax <re...@google.com>.
On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <kl...@google.com> wrote:

> The premise of URN + payload is that you can establish a spec. A native
> override still needs to meet the spec - it may still require some
> compensating code. Worrying about weird differences between runners seems
> more about worrying that an adequate spec cannot be determined.
>

My point exactly. a SDF-based KafkaIO can run in the middle of a pipeline.
E.g. we could have TextIO producing a list of topics, and the SDF KafkaIO
run after that on this dynamic (not known until runtime) list of topics. If
the native Flink source doesn't work this way, then it doesn't share the
same spec and should have a different URN.


> Runners will already invoke the SDF differently, so users treating every
> detail of some implementation as the spec are doomed.
>
> Kenn
>
> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com>
>>> wrote:
>>>
>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>> DoFn that
>>>> > fails loudly if it's actually called in the short term rather than a
>>>> full
>>>> > Python implementation.
>>>>
>>>> For configurable runner-native IO, for now, I think it is reasonable to
>>>> use a URN + special data payload directly without a KafkaDoFn -- assuming
>>>> it's a portable pipeline. That's what we do in Go for PubSub-on-Dataflow
>>>> and something similar would work for Kafka-on-Flink as well. I agree that
>>>> non-native alternative implementation is desirable, but if one is not
>>>> present we should IMO rather fail at job submission instead of at runtime.
>>>> I could imagine connectors intrinsic to an execution engine where
>>>> non-native implementations are not possible.
>>>>
>>>
>>> I think, in this case, KafkaDoFn can be a SDF that would expand similar
>>> to any other SDF by default (initial splitting, GBK, and a map-task
>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>> override it with an runner-native implementation if desired. I assume
>>> runner will have a chance to perform this override before the SDF expansion
>>> (which is not fully designed yet). Providing a separate source/sink
>>> transforms for Flink native Kafka will be an option as well, but that will
>>> be less desirable from a Python user API perspective.
>>>
>>
>> Are we sure that the internal SDF will provide the same functionality as
>> the native one? What if the Kafka SDF is in the middle of a pipeline - can
>> Flink support that? Having a separate transform for the Flink native source
>> might be a better user experience than having one that changes its behavior
>> in strange ways depending on the runner.
>>
>>
>>
>>>
>>>
>>>>
>>>>
>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>> > Hi Cham,
>>>>>
>>>>> > Thanks for the feedback!
>>>>>
>>>>> > I should have probably clarified that my POC and questions aren't
>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>> that we
>>>>> internally use as well. We have existing Flink pipelines that are
>>>>> written
>>>>> in Java and we want to use the same connectors with the Python SDK on
>>>>> top
>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>> isn't a
>>>>> concern as much as the ability to integrate is.
>>>>>
>>>>
>>> Thanks for the clarification. Agree that providing runner-native
>>> implementations of established source/sinks can be can be desirable in some
>>> cases.
>>>
>>>
>>>>> > -->
>>>>>
>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>> > <ch...@google.com>
>>>>> wrote:
>>>>>
>>>>> >> Hi Thomas,
>>>>>
>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>
>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org>
>>>>> wrote:
>>>>>
>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>> source
>>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>>> runner.
>>>>>
>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>> sources
>>>>> and sinks that we also use in other Flink jobs.
>>>>>
>>>>>
>>>>> >> Could you clarify what you mean by same Flink connector ? Do you
>>>>> mean
>>>>> that Beam-based and non-Beam-based versions of Flink will use the same
>>>>> Kafka connector implementation ?
>>>>>
>>>>>
>>>>> > The native Flink sources as shown in the example below, not the Beam
>>>>> KafkaIO or other Beam sources.
>>>>>
>>>>>
>>>>>
>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>> reading
>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>
>>>>>
>>>>>
>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>
>>>>>
>>>>>
>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>> particular:
>>>>>
>>>>> >>> * Enabling custom translation on the Flink portable runner (custom
>>>>> translator could be loaded with ServiceLoader, additional translations
>>>>> could also be specified as job server configuration, pipeline option,
>>>>> ...)
>>>>>
>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>> recommended
>>>>> way to define a custom transform (it would eventually live in a
>>>>> reusable
>>>>> custom module that pipeline authors can import)? Also, the example
>>>>> does not
>>>>> have the configuration part covered yet..
>>>>>
>>>>>
>>>>> >> The only standard unbounded source API offered by Python SDK is the
>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to
>>>>> add a
>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>> comparing
>>>>> different Kafka Python client libraries. Will share more information on
>>>>> this soon.
>>>>>
>>>>> >> I understand this might not be possible in all cases and we might
>>>>> want
>>>>> to consider adding a native source/sink implementations. But this will
>>>>> result in the implementation being runner-specific (each runner will
>>>>> have
>>>>> to have it's own source/sink implementation). So I think we should try
>>>>> to
>>>>> add connector implementations to Beam using the standard API whenever
>>>>> possible. We also have plans to implement support for cross SDK
>>>>> transforms
>>>>> in the future (so that we can utilize Java implementation from Python
>>>>> for
>>>>> example) but we are not there yet and we might still want to implement
>>>>> a
>>>>> connector for a given SDK if there's good client library support.
>>>>>
>>>>>
>>>>> > It is great that the Python SDK will have connectors that are
>>>>> written in
>>>>> Python in the future, but I think it is equally if not more important
>>>>> to be
>>>>> able to use at least the Java Beam connectors with Python SDK (and any
>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>> should be
>>>>> possible to offer this to users in a way that is largely transparent.
>>>>> It
>>>>> takes significant time and effort to mature connectors and I'm not
>>>>> sure it
>>>>> is realistic to repeat that for all external systems in multiple
>>>>> languages.
>>>>> Or, to put it in another way, it is likely that instead of one over
>>>>> time
>>>>> rock solid connector per external system there will be multiple less
>>>>> mature
>>>>> implementations. That's also the reason we internally want to use the
>>>>> Flink
>>>>> native connectors - we know what they can and cannot do and want to
>>>>> leverage the existing investment.
>>>>>
>>>>> There are two related issues here: how to specify transforms (such as
>>>>> sources) in a language-independent manner, and how specific runners can
>>>>> recognize and run them, but URNs solve both. For  this we use URNs: the
>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>> payload.) A runner that understands these URNs is free to make any
>>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>>
>>>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>>>> that
>>>>> fails loudly if it's actually called in the short term rather than a
>>>>> full
>>>>> Python implementation. Eventually, we would like to be able to call
>>>>> out to
>>>>> another SDK to expand full transforms (e.g. more complicated ones like
>>>>> BigQueryIO).
>>>>>
>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>> considers the message value and uses the byte coder that both sides
>>>>> understand. If I wanted to pass on the key and possibly other metadata
>>>>> to
>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
>>>>> specific coder is needed. Such coder could be written using protobuf,
>>>>> Avro
>>>>> etc, but it would also need to be registered.
>>>>>
>>>>>
>>>>> >> I think this requirement goes away if we implement Kafka in Python
>>>>> SDK.
>>>>>
>>>>> > Wouldn't this be needed for any cross language pipeline? Or rather
>>>>> any
>>>>> that isn't only using PCollection<byte[]>? Is there a language agnostic
>>>>> encoding for KV<?,?>, for example?
>>>>>
>>>>> Yes, Coders are also specified by URN (+components and/or payload), and
>>>>> there are a couple of standard ones, including KV. See
>>>>>
>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>> This is not a closed set.
>>>>>
>>>>> - Robert
>>>>>
>>>>

Re: Custom URNs and runner translation

Posted by Kenneth Knowles <kl...@google.com>.
The premise of URN + payload is that you can establish a spec. A native
override still needs to meet the spec - it may still require some
compensating code. Worrying about weird differences between runners seems
more about worrying that an adequate spec cannot be determined.

Runners will already invoke the SDF differently, so users treating every
detail of some implementation as the spec are doomed.

Kenn

On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:

>
>
> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com> wrote:
>>
>>> > Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>> that
>>> > fails loudly if it's actually called in the short term rather than a
>>> full
>>> > Python implementation.
>>>
>>> For configurable runner-native IO, for now, I think it is reasonable to
>>> use a URN + special data payload directly without a KafkaDoFn -- assuming
>>> it's a portable pipeline. That's what we do in Go for PubSub-on-Dataflow
>>> and something similar would work for Kafka-on-Flink as well. I agree that
>>> non-native alternative implementation is desirable, but if one is not
>>> present we should IMO rather fail at job submission instead of at runtime.
>>> I could imagine connectors intrinsic to an execution engine where
>>> non-native implementations are not possible.
>>>
>>
>> I think, in this case, KafkaDoFn can be a SDF that would expand similar
>> to any other SDF by default (initial splitting, GBK, and a map-task
>> equivalent, for example) but a runner (Flink in this case) will be free to
>> override it with an runner-native implementation if desired. I assume
>> runner will have a chance to perform this override before the SDF expansion
>> (which is not fully designed yet). Providing a separate source/sink
>> transforms for Flink native Kafka will be an option as well, but that will
>> be less desirable from a Python user API perspective.
>>
>
> Are we sure that the internal SDF will provide the same functionality as
> the native one? What if the Kafka SDF is in the middle of a pipeline - can
> Flink support that? Having a separate transform for the Flink native source
> might be a better user experience than having one that changes its behavior
> in strange ways depending on the runner.
>
>
>
>>
>>
>>>
>>>
>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:
>>>>
>>>> > Hi Cham,
>>>>
>>>> > Thanks for the feedback!
>>>>
>>>> > I should have probably clarified that my POC and questions aren't
>>>> specific to Kafka as source, but pretty much any other source/sink that
>>>> we
>>>> internally use as well. We have existing Flink pipelines that are
>>>> written
>>>> in Java and we want to use the same connectors with the Python SDK on
>>>> top
>>>> of the already operationalized Flink stack. Therefore, portability
>>>> isn't a
>>>> concern as much as the ability to integrate is.
>>>>
>>>
>> Thanks for the clarification. Agree that providing runner-native
>> implementations of established source/sinks can be can be desirable in some
>> cases.
>>
>>
>>>> > -->
>>>>
>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>> > <ch...@google.com>
>>>> wrote:
>>>>
>>>> >> Hi Thomas,
>>>>
>>>> >> Seems like we are working on similar (partially) things :).
>>>>
>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org> wrote:
>>>>
>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming source
>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>> runner.
>>>>
>>>> >>> We eventually want to use the same native Flink connectors for
>>>> sources
>>>> and sinks that we also use in other Flink jobs.
>>>>
>>>>
>>>> >> Could you clarify what you mean by same Flink connector ? Do you mean
>>>> that Beam-based and non-Beam-based versions of Flink will use the same
>>>> Kafka connector implementation ?
>>>>
>>>>
>>>> > The native Flink sources as shown in the example below, not the Beam
>>>> KafkaIO or other Beam sources.
>>>>
>>>>
>>>>
>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>> reading
>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>
>>>>
>>>>
>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>
>>>>
>>>>
>>>> >>> I'm looking for feedback/opinions on the following items in
>>>> particular:
>>>>
>>>> >>> * Enabling custom translation on the Flink portable runner (custom
>>>> translator could be loaded with ServiceLoader, additional translations
>>>> could also be specified as job server configuration, pipeline option,
>>>> ...)
>>>>
>>>> >>> * For the Python side, is what's shown in the commit the recommended
>>>> way to define a custom transform (it would eventually live in a reusable
>>>> custom module that pipeline authors can import)? Also, the example does
>>>> not
>>>> have the configuration part covered yet..
>>>>
>>>>
>>>> >> The only standard unbounded source API offered by Python SDK is the
>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to add
>>>> a
>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>> comparing
>>>> different Kafka Python client libraries. Will share more information on
>>>> this soon.
>>>>
>>>> >> I understand this might not be possible in all cases and we might
>>>> want
>>>> to consider adding a native source/sink implementations. But this will
>>>> result in the implementation being runner-specific (each runner will
>>>> have
>>>> to have it's own source/sink implementation). So I think we should try
>>>> to
>>>> add connector implementations to Beam using the standard API whenever
>>>> possible. We also have plans to implement support for cross SDK
>>>> transforms
>>>> in the future (so that we can utilize Java implementation from Python
>>>> for
>>>> example) but we are not there yet and we might still want to implement a
>>>> connector for a given SDK if there's good client library support.
>>>>
>>>>
>>>> > It is great that the Python SDK will have connectors that are written
>>>> in
>>>> Python in the future, but I think it is equally if not more important
>>>> to be
>>>> able to use at least the Java Beam connectors with Python SDK (and any
>>>> other non-Java SDK). Especially in a fully managed environment it
>>>> should be
>>>> possible to offer this to users in a way that is largely transparent. It
>>>> takes significant time and effort to mature connectors and I'm not sure
>>>> it
>>>> is realistic to repeat that for all external systems in multiple
>>>> languages.
>>>> Or, to put it in another way, it is likely that instead of one over time
>>>> rock solid connector per external system there will be multiple less
>>>> mature
>>>> implementations. That's also the reason we internally want to use the
>>>> Flink
>>>> native connectors - we know what they can and cannot do and want to
>>>> leverage the existing investment.
>>>>
>>>> There are two related issues here: how to specify transforms (such as
>>>> sources) in a language-independent manner, and how specific runners can
>>>> recognize and run them, but URNs solve both. For  this we use URNs: the
>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>> payload.) A runner that understands these URNs is free to make any
>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>
>>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>>> that
>>>> fails loudly if it's actually called in the short term rather than a
>>>> full
>>>> Python implementation. Eventually, we would like to be able to call out
>>>> to
>>>> another SDK to expand full transforms (e.g. more complicated ones like
>>>> BigQueryIO).
>>>>
>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>> considers the message value and uses the byte coder that both sides
>>>> understand. If I wanted to pass on the key and possibly other metadata
>>>> to
>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
>>>> specific coder is needed. Such coder could be written using protobuf,
>>>> Avro
>>>> etc, but it would also need to be registered.
>>>>
>>>>
>>>> >> I think this requirement goes away if we implement Kafka in Python
>>>> SDK.
>>>>
>>>> > Wouldn't this be needed for any cross language pipeline? Or rather any
>>>> that isn't only using PCollection<byte[]>? Is there a language agnostic
>>>> encoding for KV<?,?>, for example?
>>>>
>>>> Yes, Coders are also specified by URN (+components and/or payload), and
>>>> there are a couple of standard ones, including KV. See
>>>>
>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>> This is not a closed set.
>>>>
>>>> - Robert
>>>>
>>>

Re: Custom URNs and runner translation

Posted by Reuven Lax <re...@google.com>.
On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com> wrote:
>
>> > Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>> that
>> > fails loudly if it's actually called in the short term rather than a
>> full
>> > Python implementation.
>>
>> For configurable runner-native IO, for now, I think it is reasonable to
>> use a URN + special data payload directly without a KafkaDoFn -- assuming
>> it's a portable pipeline. That's what we do in Go for PubSub-on-Dataflow
>> and something similar would work for Kafka-on-Flink as well. I agree that
>> non-native alternative implementation is desirable, but if one is not
>> present we should IMO rather fail at job submission instead of at runtime.
>> I could imagine connectors intrinsic to an execution engine where
>> non-native implementations are not possible.
>>
>
> I think, in this case, KafkaDoFn can be a SDF that would expand similar to
> any other SDF by default (initial splitting, GBK, and a map-task
> equivalent, for example) but a runner (Flink in this case) will be free to
> override it with an runner-native implementation if desired. I assume
> runner will have a chance to perform this override before the SDF expansion
> (which is not fully designed yet). Providing a separate source/sink
> transforms for Flink native Kafka will be an option as well, but that will
> be less desirable from a Python user API perspective.
>

Are we sure that the internal SDF will provide the same functionality as
the native one? What if the Kafka SDF is in the middle of a pipeline - can
Flink support that? Having a separate transform for the Flink native source
might be a better user experience than having one that changes its behavior
in strange ways depending on the runner.



>
>
>>
>>
>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:
>>>
>>> > Hi Cham,
>>>
>>> > Thanks for the feedback!
>>>
>>> > I should have probably clarified that my POC and questions aren't
>>> specific to Kafka as source, but pretty much any other source/sink that
>>> we
>>> internally use as well. We have existing Flink pipelines that are written
>>> in Java and we want to use the same connectors with the Python SDK on top
>>> of the already operationalized Flink stack. Therefore, portability isn't
>>> a
>>> concern as much as the ability to integrate is.
>>>
>>
> Thanks for the clarification. Agree that providing runner-native
> implementations of established source/sinks can be can be desirable in some
> cases.
>
>
>>> > -->
>>>
>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>> > <ch...@google.com>
>>> wrote:
>>>
>>> >> Hi Thomas,
>>>
>>> >> Seems like we are working on similar (partially) things :).
>>>
>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org> wrote:
>>>
>>> >>> I'm working on a mini POC to enable Kafka as custom streaming source
>>> for a Python pipeline executing on the (in-progress) portable Flink
>>> runner.
>>>
>>> >>> We eventually want to use the same native Flink connectors for
>>> sources
>>> and sinks that we also use in other Flink jobs.
>>>
>>>
>>> >> Could you clarify what you mean by same Flink connector ? Do you mean
>>> that Beam-based and non-Beam-based versions of Flink will use the same
>>> Kafka connector implementation ?
>>>
>>>
>>> > The native Flink sources as shown in the example below, not the Beam
>>> KafkaIO or other Beam sources.
>>>
>>>
>>>
>>> >>> I got a simple example to work with the FlinkKafkaConsumer010 reading
>>> from Kafka and a Python lambda logging the value. The code is here:
>>>
>>>
>>>
>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>
>>>
>>>
>>> >>> I'm looking for feedback/opinions on the following items in
>>> particular:
>>>
>>> >>> * Enabling custom translation on the Flink portable runner (custom
>>> translator could be loaded with ServiceLoader, additional translations
>>> could also be specified as job server configuration, pipeline option,
>>> ...)
>>>
>>> >>> * For the Python side, is what's shown in the commit the recommended
>>> way to define a custom transform (it would eventually live in a reusable
>>> custom module that pipeline authors can import)? Also, the example does
>>> not
>>> have the configuration part covered yet..
>>>
>>>
>>> >> The only standard unbounded source API offered by Python SDK is the
>>> Splittable DoFn API. This is the part I'm working on. I'm trying to add a
>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
>>> different Kafka Python client libraries. Will share more information on
>>> this soon.
>>>
>>> >> I understand this might not be possible in all cases and we might want
>>> to consider adding a native source/sink implementations. But this will
>>> result in the implementation being runner-specific (each runner will have
>>> to have it's own source/sink implementation). So I think we should try to
>>> add connector implementations to Beam using the standard API whenever
>>> possible. We also have plans to implement support for cross SDK
>>> transforms
>>> in the future (so that we can utilize Java implementation from Python for
>>> example) but we are not there yet and we might still want to implement a
>>> connector for a given SDK if there's good client library support.
>>>
>>>
>>> > It is great that the Python SDK will have connectors that are written
>>> in
>>> Python in the future, but I think it is equally if not more important to
>>> be
>>> able to use at least the Java Beam connectors with Python SDK (and any
>>> other non-Java SDK). Especially in a fully managed environment it should
>>> be
>>> possible to offer this to users in a way that is largely transparent. It
>>> takes significant time and effort to mature connectors and I'm not sure
>>> it
>>> is realistic to repeat that for all external systems in multiple
>>> languages.
>>> Or, to put it in another way, it is likely that instead of one over time
>>> rock solid connector per external system there will be multiple less
>>> mature
>>> implementations. That's also the reason we internally want to use the
>>> Flink
>>> native connectors - we know what they can and cannot do and want to
>>> leverage the existing investment.
>>>
>>> There are two related issues here: how to specify transforms (such as
>>> sources) in a language-independent manner, and how specific runners can
>>> recognize and run them, but URNs solve both. For  this we use URNs: the
>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>> payload.) A runner that understands these URNs is free to make any
>>> (semantically-equivalent) substitutions it wants for this transform.
>>>
>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>> that
>>> fails loudly if it's actually called in the short term rather than a full
>>> Python implementation. Eventually, we would like to be able to call out
>>> to
>>> another SDK to expand full transforms (e.g. more complicated ones like
>>> BigQueryIO).
>>>
>>> >>> * Cross-language coders: In this example the Kafka source only
>>> considers the message value and uses the byte coder that both sides
>>> understand. If I wanted to pass on the key and possibly other metadata to
>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
>>> specific coder is needed. Such coder could be written using protobuf,
>>> Avro
>>> etc, but it would also need to be registered.
>>>
>>>
>>> >> I think this requirement goes away if we implement Kafka in Python
>>> SDK.
>>>
>>> > Wouldn't this be needed for any cross language pipeline? Or rather any
>>> that isn't only using PCollection<byte[]>? Is there a language agnostic
>>> encoding for KV<?,?>, for example?
>>>
>>> Yes, Coders are also specified by URN (+components and/or payload), and
>>> there are a couple of standard ones, including KV. See
>>>
>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>> This is not a closed set.
>>>
>>> - Robert
>>>
>>

Re: Custom URNs and runner translation

Posted by Chamikara Jayalath <ch...@google.com>.
On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <he...@google.com> wrote:

> > Note that a KafkaDoFn still needs to be provided, but could be a DoFn
> that
> > fails loudly if it's actually called in the short term rather than a full
> > Python implementation.
>
> For configurable runner-native IO, for now, I think it is reasonable to
> use a URN + special data payload directly without a KafkaDoFn -- assuming
> it's a portable pipeline. That's what we do in Go for PubSub-on-Dataflow
> and something similar would work for Kafka-on-Flink as well. I agree that
> non-native alternative implementation is desirable, but if one is not
> present we should IMO rather fail at job submission instead of at runtime.
> I could imagine connectors intrinsic to an execution engine where
> non-native implementations are not possible.
>

I think, in this case, KafkaDoFn can be a SDF that would expand similar to
any other SDF by default (initial splitting, GBK, and a map-task
equivalent, for example) but a runner (Flink in this case) will be free to
override it with an runner-native implementation if desired. I assume
runner will have a chance to perform this override before the SDF expansion
(which is not fully designed yet). Providing a separate source/sink
transforms for Flink native Kafka will be an option as well, but that will
be less desirable from a Python user API perspective.


>
>
> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:
>>
>> > Hi Cham,
>>
>> > Thanks for the feedback!
>>
>> > I should have probably clarified that my POC and questions aren't
>> specific to Kafka as source, but pretty much any other source/sink that we
>> internally use as well. We have existing Flink pipelines that are written
>> in Java and we want to use the same connectors with the Python SDK on top
>> of the already operationalized Flink stack. Therefore, portability isn't a
>> concern as much as the ability to integrate is.
>>
>
Thanks for the clarification. Agree that providing runner-native
implementations of established source/sinks can be can be desirable in some
cases.


>> > -->
>>
>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>> > <ch...@google.com>
>> wrote:
>>
>> >> Hi Thomas,
>>
>> >> Seems like we are working on similar (partially) things :).
>>
>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org> wrote:
>>
>> >>> I'm working on a mini POC to enable Kafka as custom streaming source
>> for a Python pipeline executing on the (in-progress) portable Flink
>> runner.
>>
>> >>> We eventually want to use the same native Flink connectors for sources
>> and sinks that we also use in other Flink jobs.
>>
>>
>> >> Could you clarify what you mean by same Flink connector ? Do you mean
>> that Beam-based and non-Beam-based versions of Flink will use the same
>> Kafka connector implementation ?
>>
>>
>> > The native Flink sources as shown in the example below, not the Beam
>> KafkaIO or other Beam sources.
>>
>>
>>
>> >>> I got a simple example to work with the FlinkKafkaConsumer010 reading
>> from Kafka and a Python lambda logging the value. The code is here:
>>
>>
>>
>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>
>>
>>
>> >>> I'm looking for feedback/opinions on the following items in
>> particular:
>>
>> >>> * Enabling custom translation on the Flink portable runner (custom
>> translator could be loaded with ServiceLoader, additional translations
>> could also be specified as job server configuration, pipeline option, ...)
>>
>> >>> * For the Python side, is what's shown in the commit the recommended
>> way to define a custom transform (it would eventually live in a reusable
>> custom module that pipeline authors can import)? Also, the example does
>> not
>> have the configuration part covered yet..
>>
>>
>> >> The only standard unbounded source API offered by Python SDK is the
>> Splittable DoFn API. This is the part I'm working on. I'm trying to add a
>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
>> different Kafka Python client libraries. Will share more information on
>> this soon.
>>
>> >> I understand this might not be possible in all cases and we might want
>> to consider adding a native source/sink implementations. But this will
>> result in the implementation being runner-specific (each runner will have
>> to have it's own source/sink implementation). So I think we should try to
>> add connector implementations to Beam using the standard API whenever
>> possible. We also have plans to implement support for cross SDK transforms
>> in the future (so that we can utilize Java implementation from Python for
>> example) but we are not there yet and we might still want to implement a
>> connector for a given SDK if there's good client library support.
>>
>>
>> > It is great that the Python SDK will have connectors that are written in
>> Python in the future, but I think it is equally if not more important to
>> be
>> able to use at least the Java Beam connectors with Python SDK (and any
>> other non-Java SDK). Especially in a fully managed environment it should
>> be
>> possible to offer this to users in a way that is largely transparent. It
>> takes significant time and effort to mature connectors and I'm not sure it
>> is realistic to repeat that for all external systems in multiple
>> languages.
>> Or, to put it in another way, it is likely that instead of one over time
>> rock solid connector per external system there will be multiple less
>> mature
>> implementations. That's also the reason we internally want to use the
>> Flink
>> native connectors - we know what they can and cannot do and want to
>> leverage the existing investment.
>>
>> There are two related issues here: how to specify transforms (such as
>> sources) in a language-independent manner, and how specific runners can
>> recognize and run them, but URNs solve both. For  this we use URNs: the
>> composite ReadFromKafka PTransform (that consists of a Impulse +
>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
>> specifies this read. (The KafkaDoFn could similarly have a URN and
>> payload.) A runner that understands these URNs is free to make any
>> (semantically-equivalent) substitutions it wants for this transform.
>>
>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn that
>> fails loudly if it's actually called in the short term rather than a full
>> Python implementation. Eventually, we would like to be able to call out to
>> another SDK to expand full transforms (e.g. more complicated ones like
>> BigQueryIO).
>>
>> >>> * Cross-language coders: In this example the Kafka source only
>> considers the message value and uses the byte coder that both sides
>> understand. If I wanted to pass on the key and possibly other metadata to
>> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
>> specific coder is needed. Such coder could be written using protobuf, Avro
>> etc, but it would also need to be registered.
>>
>>
>> >> I think this requirement goes away if we implement Kafka in Python SDK.
>>
>> > Wouldn't this be needed for any cross language pipeline? Or rather any
>> that isn't only using PCollection<byte[]>? Is there a language agnostic
>> encoding for KV<?,?>, for example?
>>
>> Yes, Coders are also specified by URN (+components and/or payload), and
>> there are a couple of standard ones, including KV. See
>>
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>> This is not a closed set.
>>
>> - Robert
>>
>

Re: Custom URNs and runner translation

Posted by Henning Rohde <he...@google.com>.
> Note that a KafkaDoFn still needs to be provided, but could be a DoFn that
> fails loudly if it's actually called in the short term rather than a full
> Python implementation.

For configurable runner-native IO, for now, I think it is reasonable to use
a URN + special data payload directly without a KafkaDoFn -- assuming it's
a portable pipeline. That's what we do in Go for PubSub-on-Dataflow and
something similar would work for Kafka-on-Flink as well. I agree that
non-native alternative implementation is desirable, but if one is not
present we should IMO rather fail at job submission instead of at runtime.
I could imagine connectors intrinsic to an execution engine where
non-native implementations are not possible.


On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:
>
> > Hi Cham,
>
> > Thanks for the feedback!
>
> > I should have probably clarified that my POC and questions aren't
> specific to Kafka as source, but pretty much any other source/sink that we
> internally use as well. We have existing Flink pipelines that are written
> in Java and we want to use the same connectors with the Python SDK on top
> of the already operationalized Flink stack. Therefore, portability isn't a
> concern as much as the ability to integrate is.
>
> > -->
>
> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
> > <ch...@google.com>
> wrote:
>
> >> Hi Thomas,
>
> >> Seems like we are working on similar (partially) things :).
>
> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org> wrote:
>
> >>> I'm working on a mini POC to enable Kafka as custom streaming source
> for a Python pipeline executing on the (in-progress) portable Flink runner.
>
> >>> We eventually want to use the same native Flink connectors for sources
> and sinks that we also use in other Flink jobs.
>
>
> >> Could you clarify what you mean by same Flink connector ? Do you mean
> that Beam-based and non-Beam-based versions of Flink will use the same
> Kafka connector implementation ?
>
>
> > The native Flink sources as shown in the example below, not the Beam
> KafkaIO or other Beam sources.
>
>
>
> >>> I got a simple example to work with the FlinkKafkaConsumer010 reading
> from Kafka and a Python lambda logging the value. The code is here:
>
>
>
> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>
>
>
> >>> I'm looking for feedback/opinions on the following items in particular:
>
> >>> * Enabling custom translation on the Flink portable runner (custom
> translator could be loaded with ServiceLoader, additional translations
> could also be specified as job server configuration, pipeline option, ...)
>
> >>> * For the Python side, is what's shown in the commit the recommended
> way to define a custom transform (it would eventually live in a reusable
> custom module that pipeline authors can import)? Also, the example does not
> have the configuration part covered yet..
>
>
> >> The only standard unbounded source API offered by Python SDK is the
> Splittable DoFn API. This is the part I'm working on. I'm trying to add a
> Kafka connector for Beam Python SDK using SDF API. JIRA is
> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
> different Kafka Python client libraries. Will share more information on
> this soon.
>
> >> I understand this might not be possible in all cases and we might want
> to consider adding a native source/sink implementations. But this will
> result in the implementation being runner-specific (each runner will have
> to have it's own source/sink implementation). So I think we should try to
> add connector implementations to Beam using the standard API whenever
> possible. We also have plans to implement support for cross SDK transforms
> in the future (so that we can utilize Java implementation from Python for
> example) but we are not there yet and we might still want to implement a
> connector for a given SDK if there's good client library support.
>
>
> > It is great that the Python SDK will have connectors that are written in
> Python in the future, but I think it is equally if not more important to be
> able to use at least the Java Beam connectors with Python SDK (and any
> other non-Java SDK). Especially in a fully managed environment it should be
> possible to offer this to users in a way that is largely transparent. It
> takes significant time and effort to mature connectors and I'm not sure it
> is realistic to repeat that for all external systems in multiple languages.
> Or, to put it in another way, it is likely that instead of one over time
> rock solid connector per external system there will be multiple less mature
> implementations. That's also the reason we internally want to use the Flink
> native connectors - we know what they can and cannot do and want to
> leverage the existing investment.
>
> There are two related issues here: how to specify transforms (such as
> sources) in a language-independent manner, and how specific runners can
> recognize and run them, but URNs solve both. For  this we use URNs: the
> composite ReadFromKafka PTransform (that consists of a Impulse +
> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
> specifies this read. (The KafkaDoFn could similarly have a URN and
> payload.) A runner that understands these URNs is free to make any
> (semantically-equivalent) substitutions it wants for this transform.
>
> Note that a KafkaDoFn still needs to be provided, but could be a DoFn that
> fails loudly if it's actually called in the short term rather than a full
> Python implementation. Eventually, we would like to be able to call out to
> another SDK to expand full transforms (e.g. more complicated ones like
> BigQueryIO).
>
> >>> * Cross-language coders: In this example the Kafka source only
> considers the message value and uses the byte coder that both sides
> understand. If I wanted to pass on the key and possibly other metadata to
> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
> specific coder is needed. Such coder could be written using protobuf, Avro
> etc, but it would also need to be registered.
>
>
> >> I think this requirement goes away if we implement Kafka in Python SDK.
>
> > Wouldn't this be needed for any cross language pipeline? Or rather any
> that isn't only using PCollection<byte[]>? Is there a language agnostic
> encoding for KV<?,?>, for example?
>
> Yes, Coders are also specified by URN (+components and/or payload), and
> there are a couple of standard ones, including KV. See
>
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
> This is not a closed set.
>
> - Robert
>

Re: Custom URNs and runner translation

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <th...@apache.org> wrote:

> Hi Cham,

> Thanks for the feedback!

> I should have probably clarified that my POC and questions aren't
specific to Kafka as source, but pretty much any other source/sink that we
internally use as well. We have existing Flink pipelines that are written
in Java and we want to use the same connectors with the Python SDK on top
of the already operationalized Flink stack. Therefore, portability isn't a
concern as much as the ability to integrate is.

> -->

> On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
> <ch...@google.com>
wrote:

>> Hi Thomas,

>> Seems like we are working on similar (partially) things :).

>> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org> wrote:

>>> I'm working on a mini POC to enable Kafka as custom streaming source
for a Python pipeline executing on the (in-progress) portable Flink runner.

>>> We eventually want to use the same native Flink connectors for sources
and sinks that we also use in other Flink jobs.


>> Could you clarify what you mean by same Flink connector ? Do you mean
that Beam-based and non-Beam-based versions of Flink will use the same
Kafka connector implementation ?


> The native Flink sources as shown in the example below, not the Beam
KafkaIO or other Beam sources.



>>> I got a simple example to work with the FlinkKafkaConsumer010 reading
from Kafka and a Python lambda logging the value. The code is here:


https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9



>>> I'm looking for feedback/opinions on the following items in particular:

>>> * Enabling custom translation on the Flink portable runner (custom
translator could be loaded with ServiceLoader, additional translations
could also be specified as job server configuration, pipeline option, ...)

>>> * For the Python side, is what's shown in the commit the recommended
way to define a custom transform (it would eventually live in a reusable
custom module that pipeline authors can import)? Also, the example does not
have the configuration part covered yet..


>> The only standard unbounded source API offered by Python SDK is the
Splittable DoFn API. This is the part I'm working on. I'm trying to add a
Kafka connector for Beam Python SDK using SDF API. JIRA is
https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
different Kafka Python client libraries. Will share more information on
this soon.

>> I understand this might not be possible in all cases and we might want
to consider adding a native source/sink implementations. But this will
result in the implementation being runner-specific (each runner will have
to have it's own source/sink implementation). So I think we should try to
add connector implementations to Beam using the standard API whenever
possible. We also have plans to implement support for cross SDK transforms
in the future (so that we can utilize Java implementation from Python for
example) but we are not there yet and we might still want to implement a
connector for a given SDK if there's good client library support.


> It is great that the Python SDK will have connectors that are written in
Python in the future, but I think it is equally if not more important to be
able to use at least the Java Beam connectors with Python SDK (and any
other non-Java SDK). Especially in a fully managed environment it should be
possible to offer this to users in a way that is largely transparent. It
takes significant time and effort to mature connectors and I'm not sure it
is realistic to repeat that for all external systems in multiple languages.
Or, to put it in another way, it is likely that instead of one over time
rock solid connector per external system there will be multiple less mature
implementations. That's also the reason we internally want to use the Flink
native connectors - we know what they can and cannot do and want to
leverage the existing investment.

There are two related issues here: how to specify transforms (such as
sources) in a language-independent manner, and how specific runners can
recognize and run them, but URNs solve both. For  this we use URNs: the
composite ReadFromKafka PTransform (that consists of a Impulse +
SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
specifies this read. (The KafkaDoFn could similarly have a URN and
payload.) A runner that understands these URNs is free to make any
(semantically-equivalent) substitutions it wants for this transform.

Note that a KafkaDoFn still needs to be provided, but could be a DoFn that
fails loudly if it's actually called in the short term rather than a full
Python implementation. Eventually, we would like to be able to call out to
another SDK to expand full transforms (e.g. more complicated ones like
BigQueryIO).

>>> * Cross-language coders: In this example the Kafka source only
considers the message value and uses the byte coder that both sides
understand. If I wanted to pass on the key and possibly other metadata to
the Python transform (similar to KafkaRecord from Java KafkaIO), then a
specific coder is needed. Such coder could be written using protobuf, Avro
etc, but it would also need to be registered.


>> I think this requirement goes away if we implement Kafka in Python SDK.

> Wouldn't this be needed for any cross language pipeline? Or rather any
that isn't only using PCollection<byte[]>? Is there a language agnostic
encoding for KV<?,?>, for example?

Yes, Coders are also specified by URN (+components and/or payload), and
there are a couple of standard ones, including KV. See
https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
This is not a closed set.

- Robert

Re: Custom URNs and runner translation

Posted by Thomas Weise <th...@apache.org>.
Hi Cham,

Thanks for the feedback!

I should have probably clarified that my POC and questions aren't specific
to Kafka as source, but pretty much any other source/sink that we
internally use as well. We have existing Flink pipelines that are written
in Java and we want to use the same connectors with the Python SDK on top
of the already operationalized Flink stack. Therefore, portability isn't a
concern as much as the ability to integrate is.

-->

On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath <ch...@google.com>
wrote:

> Hi Thomas,
>
> Seems like we are working on similar (partially) things :).
>
> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org> wrote:
>
>> I'm working on a mini POC to enable Kafka as custom streaming source for
>> a Python pipeline executing on the (in-progress) portable Flink runner.
>>
>> We eventually want to use the same native Flink connectors for sources
>> and sinks that we also use in other Flink jobs.
>>
>
> Could you clarify what you mean by same Flink connector ? Do you mean that
> Beam-based and non-Beam-based versions of Flink will use the same Kafka
> connector implementation ?
>

The native Flink sources as shown in the example below, not the Beam
KafkaIO or other Beam sources.

>
>
>> I got a simple example to work with the FlinkKafkaConsumer010 reading
>> from Kafka and a Python lambda logging the value. The code is here:
>>
>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3
>> 499edb1df9
>>
>>
>
>> I'm looking for feedback/opinions on the following items in particular:
>>
>> * Enabling custom translation on the Flink portable runner (custom
>> translator could be loaded with ServiceLoader, additional translations
>> could also be specified as job server configuration, pipeline option, ...)
>>
>> * For the Python side, is what's shown in the commit the recommended way
>> to define a custom transform (it would eventually live in a reusable custom
>> module that pipeline authors can import)? Also, the example does not have
>> the configuration part covered yet..
>>
>
> The only standard unbounded source API offered by Python SDK is the
> Splittable DoFn API. This is the part I'm working on. I'm trying to add a
> Kafka connector for Beam Python SDK using SDF API. JIRA is
> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
> different Kafka Python client libraries. Will share more information on
> this soon.
>
> I understand this might not be possible in all cases and we might want to
> consider adding a native source/sink implementations. But this will result
> in the implementation being runner-specific (each runner will have to have
> it's own source/sink implementation). So I think we should try to add
> connector implementations to Beam using the standard API whenever possible.
> We also have plans to implement support for cross SDK transforms in the
> future (so that we can utilize Java implementation from Python for example)
> but we are not there yet and we might still want to implement a connector
> for a given SDK if there's good client library support.
>

It is great that the Python SDK will have connectors that are written in
Python in the future, but I think it is equally if not more important to be
able to use at least the Java Beam connectors with Python SDK (and any
other non-Java SDK). Especially in a fully managed environment it should be
possible to offer this to users in a way that is largely transparent. It
takes significant time and effort to mature connectors and I'm not sure it
is realistic to repeat that for all external systems in multiple languages.
Or, to put it in another way, it is likely that instead of one over time
rock solid connector per external system there will be multiple less mature
implementations. That's also the reason we internally want to use the Flink
native connectors - we know what they can and cannot do and want to
leverage the existing investment.


>
>
>>
>> * Cross-language coders: In this example the Kafka source only considers
>> the message value and uses the byte coder that both sides understand. If I
>> wanted to pass on the key and possibly other metadata to the Python
>> transform (similar to KafkaRecord from Java KafkaIO), then a specific coder
>> is needed. Such coder could be written using protobuf, Avro etc, but it
>> would also need to be registered.
>>
>
> I think this requirement goes away if we implement Kafka in Python SDK.
>

Wouldn't this be needed for any cross language pipeline? Or rather any that
isn't only using PCollection<byte[]>? Is there a language agnostic encoding
for KV<?,?>, for example?

Thanks,
Thomas

Re: Custom URNs and runner translation

Posted by Chamikara Jayalath <ch...@google.com>.
Hi Thomas,

Seems like we are working on similar (partially) things :).

On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <th...@apache.org> wrote:

> I'm working on a mini POC to enable Kafka as custom streaming source for a
> Python pipeline executing on the (in-progress) portable Flink runner.
>
> We eventually want to use the same native Flink connectors for sources and
> sinks that we also use in other Flink jobs.
>

Could you clarify what you mean by same Flink connector ? Do you mean that
Beam-based and non-Beam-based versions of Flink will use the same Kafka
connector implementation ?


> I got a simple example to work with the FlinkKafkaConsumer010 reading from
> Kafka and a Python lambda logging the value. The code is here:
>
>
> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>
>

> I'm looking for feedback/opinions on the following items in particular:
>
> * Enabling custom translation on the Flink portable runner (custom
> translator could be loaded with ServiceLoader, additional translations
> could also be specified as job server configuration, pipeline option, ...)
>
> * For the Python side, is what's shown in the commit the recommended way
> to define a custom transform (it would eventually live in a reusable custom
> module that pipeline authors can import)? Also, the example does not have
> the configuration part covered yet..
>

The only standard unbounded source API offered by Python SDK is the
Splittable DoFn API. This is the part I'm working on. I'm trying to add a
Kafka connector for Beam Python SDK using SDF API. JIRA is
https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
different Kafka Python client libraries. Will share more information on
this soon.

I understand this might not be possible in all cases and we might want to
consider adding a native source/sink implementations. But this will result
in the implementation being runner-specific (each runner will have to have
it's own source/sink implementation). So I think we should try to add
connector implementations to Beam using the standard API whenever possible.
We also have plans to implement support for cross SDK transforms in the
future (so that we can utilize Java implementation from Python for example)
but we are not there yet and we might still want to implement a connector
for a given SDK if there's good client library support.


>
> * Cross-language coders: In this example the Kafka source only considers
> the message value and uses the byte coder that both sides understand. If I
> wanted to pass on the key and possibly other metadata to the Python
> transform (similar to KafkaRecord from Java KafkaIO), then a specific coder
> is needed. Such coder could be written using protobuf, Avro etc, but it
> would also need to be registered.
>

I think this requirement goes away if we implement Kafka in Python SDK.


> Thanks,
> Thomas
>
>
Thanks,
Cham