You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Boyuan Zhang <bo...@google.com> on 2020/08/26 00:02:02 UTC

Create External Transform with WindowFn

Hi team,

I'm trying to create an External transform in Java SDK, which expands into
several ParDo and a Window.into(FixWindow). When I use this transform in
Python SDK, I get an pipeline construction error:

apache_beam/utils/urns.py", line 186, in from_runner_api
    parameter_type, constructor = cls._known_urns[fn_proto.urn]
KeyError: 'beam:window_fn:serialized_java:v1'

Is it expected that I cannot use a Window.into when building External
Ptransform? Or do I miss anything here?


Thanks for your help!

Re: Create External Transform with WindowFn

Posted by Chamikara Jayalath <ch...@google.com>.
Actually Reshuffle uses a custom non-merging Window (IdentityWindowFn) [1].
Dataflow Runner v2 (which is required for multi-language pipelines on
Dataflow) currently does not support custom windowing functions I believe.
So getting Reshuffle (and by extension connectors such as Snowflake) for
Dataflow Python as a cross-language transform will require support for
custom Window functions on Dataflow Runner v2 in addition to
https://issues.apache.org/jira/browse/BEAM-11360 unfortunately. I believe
we are working on the prior but I'm not sure about the exact ETA. Latter
should be done by the end of the quarter.

+Harsh Vardhan <an...@google.com> +Robert Bradshaw <ro...@google.com>

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L79

On Mon, Nov 30, 2020 at 10:35 AM Chamikara Jayalath <ch...@google.com>
wrote:

> Please follow https://issues.apache.org/jira/browse/BEAM-11360 instead.
>
> Thanks,
> Cham
>
> On Mon, Nov 30, 2020 at 10:26 AM Steve Niemitz <sn...@apache.org>
> wrote:
>
>> alright, thank you.  Is BEAM-10507 the jira to watch for any progress on
>> that?
>>
>> On Mon, Nov 30, 2020 at 12:55 PM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> Hi Steve,
>>>
>>> Unfortunately I don't think there is a workaround before we have the
>>> change that Cham mentions.
>>>
>>> On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>>
>>>> I'm trying to write an xlang transform that uses Reshuffle internally,
>>>> and ran into this as well.  Is there any workaround to this for now (other
>>>> than removing the reshuffle), or do I just need to wait for what Chamikara
>>>> mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
>>>> PR as well [1].
>>>>
>>>> https://github.com/apache/beam/pull/12149#discussion_r463710165
>>>>
>>>> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <bo...@google.com>
>>>> wrote:
>>>>
>>>>> That explains a lot. Thanks, Cham!
>>>>>
>>>>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> Due to the proto -> object -> proto conversion we do today, Python
>>>>>> needs to parse the full sub-graph from Java. We have hooks for PTransforms
>>>>>> and Coders but not for windowing operations. This limitation will go away
>>>>>> after we have direct Beam proto to Dataflow proto conversion in place.
>>>>>>
>>>>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <ro...@frantil.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Coders should only be checked over the language boundaries.
>>>>>>>
>>>>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Cham!
>>>>>>>>
>>>>>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that
>>>>>>>> we also check intermediate PCollection rather than only the PCollection
>>>>>>>> that across the language boundary?
>>>>>>>>
>>>>>>>> More about my Ptransform:
>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>>>>>>> output void
>>>>>>>>
>>>>>>>>                                                                  |
>>>>>>>>
>>>>>>>>                                                                   ->
>>>>>>>> ParDo() -> output PCollection to Python SDK
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>>>>>> chamikara@google.com> wrote:
>>>>>>>>
>>>>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>>>>>> [2]
>>>>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>>>>>> chamikara@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>>>>>>> working on this now.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Cham
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, Robert! I want to add more details on my External
>>>>>>>>>>> PTransform:
>>>>>>>>>>>
>>>>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>>>>>
>>>>>>>>>>>     |
>>>>>>>>>>>
>>>>>>>>>>>     -> ParDo() -> output PCollection to Python SDK
>>>>>>>>>>> The full stacktrace:
>>>>>>>>>>>
>>>>>>>>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>>>>>>> Starting expansion service at localhost:53569
>>>>>>>>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>>>>>>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>>>>>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>>>>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>>>>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>>>>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>>>>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>>>>>>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>>>>>>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>>>>>>>>
>>>>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>>>>>>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>>>>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>>>>>>>>     run()
>>>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>>>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>>>>>>>>     pipeline.run(False)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>>>>>>>>     allow_proto_holders=True)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>>>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>>>>>>     proto.windowing_strategy_id),
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <
>>>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in
>>>>>>>>>>>> an
>>>>>>>>>>>> external transform. You should also be able to window into an
>>>>>>>>>>>> arbitrary WindowFn as long as it produces standards window
>>>>>>>>>>>> types, but
>>>>>>>>>>>> if there's a bug here you could possibly work around it by
>>>>>>>>>>>> windowing
>>>>>>>>>>>> into a more standard windowing fn before returning.
>>>>>>>>>>>>
>>>>>>>>>>>> What is the full traceback?
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <
>>>>>>>>>>>> boyuanz@google.com> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> > Hi team,
>>>>>>>>>>>> >
>>>>>>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>>>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>>>>>>> >
>>>>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>>>> >     parameter_type, constructor =
>>>>>>>>>>>> cls._known_urns[fn_proto.urn]
>>>>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>>>> >
>>>>>>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks for your help!
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Create External Transform with WindowFn

Posted by Chamikara Jayalath <ch...@google.com>.
Please follow https://issues.apache.org/jira/browse/BEAM-11360 instead.

Thanks,
Cham

On Mon, Nov 30, 2020 at 10:26 AM Steve Niemitz <sn...@apache.org> wrote:

> alright, thank you.  Is BEAM-10507 the jira to watch for any progress on
> that?
>
> On Mon, Nov 30, 2020 at 12:55 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Hi Steve,
>>
>> Unfortunately I don't think there is a workaround before we have the
>> change that Cham mentions.
>>
>> On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> I'm trying to write an xlang transform that uses Reshuffle internally,
>>> and ran into this as well.  Is there any workaround to this for now (other
>>> than removing the reshuffle), or do I just need to wait for what Chamikara
>>> mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
>>> PR as well [1].
>>>
>>> https://github.com/apache/beam/pull/12149#discussion_r463710165
>>>
>>> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <bo...@google.com>
>>> wrote:
>>>
>>>> That explains a lot. Thanks, Cham!
>>>>
>>>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Due to the proto -> object -> proto conversion we do today, Python
>>>>> needs to parse the full sub-graph from Java. We have hooks for PTransforms
>>>>> and Coders but not for windowing operations. This limitation will go away
>>>>> after we have direct Beam proto to Dataflow proto conversion in place.
>>>>>
>>>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <ro...@frantil.com>
>>>>> wrote:
>>>>>
>>>>>> Coders should only be checked over the language boundaries.
>>>>>>
>>>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Cham!
>>>>>>>
>>>>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that
>>>>>>> we also check intermediate PCollection rather than only the PCollection
>>>>>>> that across the language boundary?
>>>>>>>
>>>>>>> More about my Ptransform:
>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>>>>>> output void
>>>>>>>
>>>>>>>                                                                  |
>>>>>>>
>>>>>>>                                                                   ->
>>>>>>> ParDo() -> output PCollection to Python SDK
>>>>>>>
>>>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>>>>> chamikara@google.com> wrote:
>>>>>>>
>>>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>>>>> [2]
>>>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>>>>> chamikara@google.com> wrote:
>>>>>>>>
>>>>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>>>>>> working on this now.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks, Robert! I want to add more details on my External
>>>>>>>>>> PTransform:
>>>>>>>>>>
>>>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>>>>
>>>>>>>>>>   |
>>>>>>>>>>
>>>>>>>>>>   -> ParDo() -> output PCollection to Python SDK
>>>>>>>>>> The full stacktrace:
>>>>>>>>>>
>>>>>>>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>>>>>> Starting expansion service at localhost:53569
>>>>>>>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>>>>>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>>>>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>>>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>>>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>>>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>>>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>>>>>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>>>>>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>>>>>>>
>>>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>>>>>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>>>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>>>>>>>     run()
>>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>>>>>>>     pipeline.run(False)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>>>>>>>     allow_proto_holders=True)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>>>>>     proto.windowing_strategy_id),
>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <
>>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in
>>>>>>>>>>> an
>>>>>>>>>>> external transform. You should also be able to window into an
>>>>>>>>>>> arbitrary WindowFn as long as it produces standards window
>>>>>>>>>>> types, but
>>>>>>>>>>> if there's a bug here you could possibly work around it by
>>>>>>>>>>> windowing
>>>>>>>>>>> into a more standard windowing fn before returning.
>>>>>>>>>>>
>>>>>>>>>>> What is the full traceback?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Hi team,
>>>>>>>>>>> >
>>>>>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>>>>>> >
>>>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>>> >
>>>>>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks for your help!
>>>>>>>>>>>
>>>>>>>>>>

Re: Create External Transform with WindowFn

Posted by Steve Niemitz <sn...@apache.org>.
alright, thank you.  Is BEAM-10507 the jira to watch for any progress on
that?

On Mon, Nov 30, 2020 at 12:55 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi Steve,
>
> Unfortunately I don't think there is a workaround before we have the
> change that Cham mentions.
>
> On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <sn...@apache.org> wrote:
>
>> I'm trying to write an xlang transform that uses Reshuffle internally,
>> and ran into this as well.  Is there any workaround to this for now (other
>> than removing the reshuffle), or do I just need to wait for what Chamikara
>> mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
>> PR as well [1].
>>
>> https://github.com/apache/beam/pull/12149#discussion_r463710165
>>
>> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> That explains a lot. Thanks, Cham!
>>>
>>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> Due to the proto -> object -> proto conversion we do today, Python
>>>> needs to parse the full sub-graph from Java. We have hooks for PTransforms
>>>> and Coders but not for windowing operations. This limitation will go away
>>>> after we have direct Beam proto to Dataflow proto conversion in place.
>>>>
>>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <ro...@frantil.com>
>>>> wrote:
>>>>
>>>>> Coders should only be checked over the language boundaries.
>>>>>
>>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com> wrote:
>>>>>
>>>>>> Thanks Cham!
>>>>>>
>>>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that
>>>>>> we also check intermediate PCollection rather than only the PCollection
>>>>>> that across the language boundary?
>>>>>>
>>>>>> More about my Ptransform:
>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>>>>> output void
>>>>>>
>>>>>>                                                                |
>>>>>>
>>>>>>                                                                 -> ParDo()
>>>>>> -> output PCollection to Python SDK
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>>>> [2]
>>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>>>
>>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>>>> chamikara@google.com> wrote:
>>>>>>>
>>>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>>>>> working on this now.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Robert! I want to add more details on my External
>>>>>>>>> PTransform:
>>>>>>>>>
>>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>>>
>>>>>>>>>   |
>>>>>>>>>
>>>>>>>>>   -> ParDo() -> output PCollection to Python SDK
>>>>>>>>> The full stacktrace:
>>>>>>>>>
>>>>>>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>>>>> Starting expansion service at localhost:53569
>>>>>>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>>>>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>>>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>>>>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>>>>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>>>>>>
>>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>>>>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>>>>>>     run()
>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>>>>>>     pipeline.run(False)
>>>>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>>>>>>     allow_proto_holders=True)
>>>>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>>>>     proto.windowing_strategy_id),
>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <
>>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>>>>>>> external transform. You should also be able to window into an
>>>>>>>>>> arbitrary WindowFn as long as it produces standards window types,
>>>>>>>>>> but
>>>>>>>>>> if there's a bug here you could possibly work around it by
>>>>>>>>>> windowing
>>>>>>>>>> into a more standard windowing fn before returning.
>>>>>>>>>>
>>>>>>>>>> What is the full traceback?
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > Hi team,
>>>>>>>>>> >
>>>>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>>>>> >
>>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>> >
>>>>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Thanks for your help!
>>>>>>>>>>
>>>>>>>>>

Re: Create External Transform with WindowFn

Posted by Boyuan Zhang <bo...@google.com>.
Hi Steve,

Unfortunately I don't think there is a workaround before we have the change
that Cham mentions.

On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <sn...@apache.org> wrote:

> I'm trying to write an xlang transform that uses Reshuffle internally, and
> ran into this as well.  Is there any workaround to this for now (other than
> removing the reshuffle), or do I just need to wait for what Chamikara
> mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
> PR as well [1].
>
> https://github.com/apache/beam/pull/12149#discussion_r463710165
>
> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> That explains a lot. Thanks, Cham!
>>
>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Due to the proto -> object -> proto conversion we do today, Python needs
>>> to parse the full sub-graph from Java. We have hooks for PTransforms and
>>> Coders but not for windowing operations. This limitation will go away after
>>> we have direct Beam proto to Dataflow proto conversion in place.
>>>
>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <ro...@frantil.com> wrote:
>>>
>>>> Coders should only be checked over the language boundaries.
>>>>
>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com> wrote:
>>>>
>>>>> Thanks Cham!
>>>>>
>>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that we
>>>>> also check intermediate PCollection rather than only the PCollection that
>>>>> across the language boundary?
>>>>>
>>>>> More about my Ptransform:
>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>>>> output void
>>>>>
>>>>>                                                                |
>>>>>
>>>>>                                                                 -> ParDo()
>>>>> -> output PCollection to Python SDK
>>>>>
>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>>>> working on this now.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, Robert! I want to add more details on my External
>>>>>>>> PTransform:
>>>>>>>>
>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>>
>>>>>>>> |
>>>>>>>>
>>>>>>>> -> ParDo() -> output PCollection to Python SDK
>>>>>>>> The full stacktrace:
>>>>>>>>
>>>>>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>>>> Starting expansion service at localhost:53569
>>>>>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>>>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>>>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>>>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>>>>>
>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>>>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>>>>>     run()
>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>>>>>     pipeline.run(False)
>>>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>>>>>     allow_proto_holders=True)
>>>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>>>     id in proto.outputs.items()
>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>>>     id in proto.outputs.items()
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>>>     proto.windowing_strategy_id),
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <
>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>
>>>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>>>>>> external transform. You should also be able to window into an
>>>>>>>>> arbitrary WindowFn as long as it produces standards window types,
>>>>>>>>> but
>>>>>>>>> if there's a bug here you could possibly work around it by
>>>>>>>>> windowing
>>>>>>>>> into a more standard windowing fn before returning.
>>>>>>>>>
>>>>>>>>> What is the full traceback?
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> > Hi team,
>>>>>>>>> >
>>>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>>>> >
>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>> >
>>>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Thanks for your help!
>>>>>>>>>
>>>>>>>>

Re: Create External Transform with WindowFn

Posted by Steve Niemitz <sn...@apache.org>.
I'm trying to write an xlang transform that uses Reshuffle internally, and
ran into this as well.  Is there any workaround to this for now (other than
removing the reshuffle), or do I just need to wait for what Chamikara
mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
PR as well [1].

https://github.com/apache/beam/pull/12149#discussion_r463710165

On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <bo...@google.com> wrote:

> That explains a lot. Thanks, Cham!
>
> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Due to the proto -> object -> proto conversion we do today, Python needs
>> to parse the full sub-graph from Java. We have hooks for PTransforms and
>> Coders but not for windowing operations. This limitation will go away after
>> we have direct Beam proto to Dataflow proto conversion in place.
>>
>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <ro...@frantil.com> wrote:
>>
>>> Coders should only be checked over the language boundaries.
>>>
>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com> wrote:
>>>
>>>> Thanks Cham!
>>>>
>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>> strategy(which is *GlobalWindows *in my case). Is it expected that we
>>>> also check intermediate PCollection rather than only the PCollection that
>>>> across the language boundary?
>>>>
>>>> More about my Ptransform:
>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>>> output void
>>>>
>>>>                                                              |
>>>>
>>>>                                                               -> ParDo() ->
>>>> output PCollection to Python SDK
>>>>
>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>
>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>>> working on this now.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>>>>>
>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>                                                                     |
>>>>>>>
>>>>>>> -> ParDo() -> output PCollection to Python SDK
>>>>>>> The full stacktrace:
>>>>>>>
>>>>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>>> Starting expansion service at localhost:53569
>>>>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>>>>
>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>>>>> Traceback (most recent call last):
>>>>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>>>>     run()
>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>>>>     pipeline.run(False)
>>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>>>>     allow_proto_holders=True)
>>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>>     id in proto.outputs.items()
>>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>>     id in proto.outputs.items()
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>>     proto.windowing_strategy_id),
>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>>>>> external transform. You should also be able to window into an
>>>>>>>> arbitrary WindowFn as long as it produces standards window types,
>>>>>>>> but
>>>>>>>> if there's a bug here you could possibly work around it by windowing
>>>>>>>> into a more standard windowing fn before returning.
>>>>>>>>
>>>>>>>> What is the full traceback?
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Hi team,
>>>>>>>> >
>>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>>> >
>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>> >
>>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Thanks for your help!
>>>>>>>>
>>>>>>>

Re: Create External Transform with WindowFn

Posted by Boyuan Zhang <bo...@google.com>.
That explains a lot. Thanks, Cham!

On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Due to the proto -> object -> proto conversion we do today, Python needs
> to parse the full sub-graph from Java. We have hooks for PTransforms and
> Coders but not for windowing operations. This limitation will go away after
> we have direct Beam proto to Dataflow proto conversion in place.
>
> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <ro...@frantil.com> wrote:
>
>> Coders should only be checked over the language boundaries.
>>
>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> Thanks Cham!
>>>
>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>> strategy(which is *GlobalWindows *in my case). Is it expected that we
>>> also check intermediate PCollection rather than only the PCollection that
>>> across the language boundary?
>>>
>>> More about my Ptransform:
>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>> output void
>>>
>>>                                                              |
>>>
>>>                                                               -> ParDo() ->
>>> output PCollection to Python SDK
>>>
>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>> which is what is being registered by Python [2]. This seems to be the
>>>> immediate issue. Tracking bug for supporting custom windows is
>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>> [2]
>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>
>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>> working on this now.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>>>>
>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>                                                                     |
>>>>>>
>>>>>> -> ParDo() -> output PCollection to Python SDK
>>>>>> The full stacktrace:
>>>>>>
>>>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>> Starting expansion service at localhost:53569
>>>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>>>
>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>>>> Traceback (most recent call last):
>>>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>>>     run()
>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>>>     pipeline.run(False)
>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>>>     allow_proto_holders=True)
>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>     id in proto.outputs.items()
>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>     id in proto.outputs.items()
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>     proto.windowing_strategy_id),
>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>>>> external transform. You should also be able to window into an
>>>>>>> arbitrary WindowFn as long as it produces standards window types, but
>>>>>>> if there's a bug here you could possibly work around it by windowing
>>>>>>> into a more standard windowing fn before returning.
>>>>>>>
>>>>>>> What is the full traceback?
>>>>>>>
>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Hi team,
>>>>>>> >
>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>> >
>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>> >
>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>> >
>>>>>>> >
>>>>>>> > Thanks for your help!
>>>>>>>
>>>>>>

Re: Create External Transform with WindowFn

Posted by Chamikara Jayalath <ch...@google.com>.
Due to the proto -> object -> proto conversion we do today, Python needs to
parse the full sub-graph from Java. We have hooks for PTransforms and
Coders but not for windowing operations. This limitation will go away after
we have direct Beam proto to Dataflow proto conversion in place.

On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <ro...@frantil.com> wrote:

> Coders should only be checked over the language boundaries.
>
> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Thanks Cham!
>>
>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>> introduced by Java *Reshuffle.viaRandomKey()*. But
>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>> strategy(which is *GlobalWindows *in my case). Is it expected that we
>> also check intermediate PCollection rather than only the PCollection that
>> across the language boundary?
>>
>> More about my Ptransform:
>> MyExternalPTransform  -- expand to --  ParDo() ->
>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>> output void
>>
>>                                                            |
>>
>>                                                             -> ParDo() ->
>> output PCollection to Python SDK
>>
>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which
>>> is what is being registered by Python [2]. This seems to be the immediate
>>> issue. Tracking bug for supporting custom windows is
>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>> [2]
>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>
>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> Pipelines that use external WindowingStrategies might be failing during
>>>> proto -> object -> proto conversion we do today. This limitation will go
>>>> away once Dataflow directly starts reading Beam protos. We are working on
>>>> this now.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com>
>>>> wrote:
>>>>
>>>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>>>
>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>                                                                     |
>>>>>                                                                     ->
>>>>> ParDo() -> output PCollection to Python SDK
>>>>> The full stacktrace:
>>>>>
>>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>>> Starting expansion service at localhost:53569
>>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>>
>>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>>> Traceback (most recent call last):
>>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>>     run()
>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>>     pipeline.run(False)
>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>>     allow_proto_holders=True)
>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>     id in proto.outputs.items()
>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>     id in proto.outputs.items()
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>     proto.windowing_strategy_id),
>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>
>>>>>
>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> You should be able to use a WindowInto with any of the common
>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>>> external transform. You should also be able to window into an
>>>>>> arbitrary WindowFn as long as it produces standards window types, but
>>>>>> if there's a bug here you could possibly work around it by windowing
>>>>>> into a more standard windowing fn before returning.
>>>>>>
>>>>>> What is the full traceback?
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > Hi team,
>>>>>> >
>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>> >
>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>> >
>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>> External Ptransform? Or do I miss anything here?
>>>>>> >
>>>>>> >
>>>>>> > Thanks for your help!
>>>>>>
>>>>>

Re: Create External Transform with WindowFn

Posted by Robert Burke <ro...@frantil.com>.
Coders should only be checked over the language boundaries.

On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <bo...@google.com> wrote:

> Thanks Cham!
>
>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
> introduced by Java *Reshuffle.viaRandomKey()*. But
> *Reshuffle.viaRandomKey()* does rewindowed into original window
> strategy(which is *GlobalWindows *in my case). Is it expected that we
> also check intermediate PCollection rather than only the PCollection that
> across the language boundary?
>
> More about my Ptransform:
> MyExternalPTransform  -- expand to --  ParDo() -> Reshuffle.viaRandomKey()
> -> ParDo() -> WindowInto(FixWindow) -> ParDo() -> output void
>
>                                                            |
>
>                                                             -> ParDo() ->
> output PCollection to Python SDK
>
> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) for
>> the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is
>> what is being registered by Python [2]. This seems to be the immediate
>> issue. Tracking bug for supporting custom windows is
>> https://issues.apache.org/jira/browse/BEAM-10507.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>> [2]
>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>
>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Pipelines that use external WindowingStrategies might be failing during
>>> proto -> object -> proto conversion we do today. This limitation will go
>>> away once Dataflow directly starts reading Beam protos. We are working on
>>> this now.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com> wrote:
>>>
>>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>>
>>>> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
>>>> -> ParDo() -> output void
>>>>                                                                     |
>>>>                                                                     ->
>>>> ParDo() -> output PCollection to Python SDK
>>>> The full stacktrace:
>>>>
>>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>>> Starting expansion service at localhost:53569
>>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>
>>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>>> Traceback (most recent call last):
>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>>     run()
>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>>     pipeline.run(False)
>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>     return self.runner.run_pipeline(self, self._options)
>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>>     allow_proto_holders=True)
>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>     id in proto.outputs.items()
>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>     id in proto.outputs.items()
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>     proto.windowing_strategy_id),
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>
>>>>
>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> You should be able to use a WindowInto with any of the common
>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>> external transform. You should also be able to window into an
>>>>> arbitrary WindowFn as long as it produces standards window types, but
>>>>> if there's a bug here you could possibly work around it by windowing
>>>>> into a more standard windowing fn before returning.
>>>>>
>>>>> What is the full traceback?
>>>>>
>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>>> wrote:
>>>>> >
>>>>> > Hi team,
>>>>> >
>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>> >
>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>> >
>>>>> > Is it expected that I cannot use a Window.into when building
>>>>> External Ptransform? Or do I miss anything here?
>>>>> >
>>>>> >
>>>>> > Thanks for your help!
>>>>>
>>>>

Re: Create External Transform with WindowFn

Posted by Boyuan Zhang <bo...@google.com>.
Thanks Cham!

 I just realized that the *beam:window_fn:serialized_**java:v1 *is
introduced by Java *Reshuffle.viaRandomKey()*. But
*Reshuffle.viaRandomKey()* does rewindowed into original window
strategy(which is *GlobalWindows *in my case). Is it expected that we also
check intermediate PCollection rather than only the PCollection that across
the language boundary?

More about my Ptransform:
MyExternalPTransform  -- expand to --  ParDo() -> Reshuffle.viaRandomKey()
-> ParDo() -> WindowInto(FixWindow) -> ParDo() -> output void

                                                         |

                                                          -> ParDo() ->
output PCollection to Python SDK

On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Also it's strange that Java used (beam:window_fn:serialized_java:v1) for
> the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is
> what is being registered by Python [2]. This seems to be the immediate
> issue. Tracking bug for supporting custom windows is
> https://issues.apache.org/jira/browse/BEAM-10507.
>
> [1]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
> [2]
> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>
> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Pipelines that use external WindowingStrategies might be failing during
>> proto -> object -> proto conversion we do today. This limitation will go
>> away once Dataflow directly starts reading Beam protos. We are working on
>> this now.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>
>>> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
>>> -> ParDo() -> output void
>>>                                                                     |
>>>                                                                     ->
>>> ParDo() -> output PCollection to Python SDK
>>> The full stacktrace:
>>>
>>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>>> Starting expansion service at localhost:53569
>>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>
>>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>>> Traceback (most recent call last):
>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>>     run()
>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>>     pipeline.run(False)
>>>   File "apache_beam/pipeline.py", line 534, in run
>>>     return self.runner.run_pipeline(self, self._options)
>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>>     allow_proto_holders=True)
>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>     id in proto.outputs.items()
>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>     id in proto.outputs.items()
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>     proto.windowing_strategy_id),
>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>
>>>
>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> You should be able to use a WindowInto with any of the common
>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>> external transform. You should also be able to window into an
>>>> arbitrary WindowFn as long as it produces standards window types, but
>>>> if there's a bug here you could possibly work around it by windowing
>>>> into a more standard windowing fn before returning.
>>>>
>>>> What is the full traceback?
>>>>
>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com>
>>>> wrote:
>>>> >
>>>> > Hi team,
>>>> >
>>>> > I'm trying to create an External transform in Java SDK, which expands
>>>> into several ParDo and a Window.into(FixWindow). When I use this transform
>>>> in Python SDK, I get an pipeline construction error:
>>>> >
>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>> >
>>>> > Is it expected that I cannot use a Window.into when building External
>>>> Ptransform? Or do I miss anything here?
>>>> >
>>>> >
>>>> > Thanks for your help!
>>>>
>>>

Re: Create External Transform with WindowFn

Posted by Chamikara Jayalath <ch...@google.com>.
Also it's strange that Java used (beam:window_fn:serialized_java:v1) for
the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is what
is being registered by Python [2]. This seems to be the immediate issue.
Tracking bug for supporting custom windows is
https://issues.apache.org/jira/browse/BEAM-10507.

[1]
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
[2]
https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449

On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Pipelines that use external WindowingStrategies might be failing during
> proto -> object -> proto conversion we do today. This limitation will go
> away once Dataflow directly starts reading Beam protos. We are working on
> this now.
>
> Thanks,
> Cham
>
> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Thanks, Robert! I want to add more details on my External PTransform:
>>
>> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
>> -> ParDo() -> output void
>>                                                                     |
>>                                                                     ->
>> ParDo() -> output PCollection to Python SDK
>> The full stacktrace:
>>
>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>> Starting expansion service at localhost:53569
>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>
>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>> Traceback (most recent call last):
>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>     run()
>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>     test_method(beam.Pipeline(options=pipeline_options))
>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>     pipeline.run(False)
>>   File "apache_beam/pipeline.py", line 534, in run
>>     return self.runner.run_pipeline(self, self._options)
>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>     allow_proto_holders=True)
>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>     id in proto.outputs.items()
>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>     id in proto.outputs.items()
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>     proto.windowing_strategy_id),
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>> KeyError: 'beam:window_fn:serialized_java:v1'
>>
>>
>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> You should be able to use a WindowInto with any of the common
>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>> external transform. You should also be able to window into an
>>> arbitrary WindowFn as long as it produces standards window types, but
>>> if there's a bug here you could possibly work around it by windowing
>>> into a more standard windowing fn before returning.
>>>
>>> What is the full traceback?
>>>
>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com> wrote:
>>> >
>>> > Hi team,
>>> >
>>> > I'm trying to create an External transform in Java SDK, which expands
>>> into several ParDo and a Window.into(FixWindow). When I use this transform
>>> in Python SDK, I get an pipeline construction error:
>>> >
>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>> >
>>> > Is it expected that I cannot use a Window.into when building External
>>> Ptransform? Or do I miss anything here?
>>> >
>>> >
>>> > Thanks for your help!
>>>
>>

Re: Create External Transform with WindowFn

Posted by Brian Hulette <bh...@google.com>.
That is consistent with the stacktrace. The error is occurring in the
from_runner_api call in dataflow_runner.py [1]

[1]
https://github.com/apache/beam/blob/e1ec7f63e40f50e4101e2f5339738545bfea7f9b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L492

On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Pipelines that use external WindowingStrategies might be failing during
> proto -> object -> proto conversion we do today. This limitation will go
> away once Dataflow directly starts reading Beam protos. We are working on
> this now.
>
> Thanks,
> Cham
>
> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Thanks, Robert! I want to add more details on my External PTransform:
>>
>> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
>> -> ParDo() -> output void
>>                                                                     |
>>                                                                     ->
>> ParDo() -> output PCollection to Python SDK
>> The full stacktrace:
>>
>> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
>> Starting expansion service at localhost:53569
>> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
>> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>
>> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
>> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
>> Traceback (most recent call last):
>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>>     run()
>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>>     test_method(beam.Pipeline(options=pipeline_options))
>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>>     pipeline.run(False)
>>   File "apache_beam/pipeline.py", line 534, in run
>>     return self.runner.run_pipeline(self, self._options)
>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>>     allow_proto_holders=True)
>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>     id in proto.outputs.items()
>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>     id in proto.outputs.items()
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>     proto.windowing_strategy_id),
>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>> KeyError: 'beam:window_fn:serialized_java:v1'
>>
>>
>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> You should be able to use a WindowInto with any of the common
>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>> external transform. You should also be able to window into an
>>> arbitrary WindowFn as long as it produces standards window types, but
>>> if there's a bug here you could possibly work around it by windowing
>>> into a more standard windowing fn before returning.
>>>
>>> What is the full traceback?
>>>
>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com> wrote:
>>> >
>>> > Hi team,
>>> >
>>> > I'm trying to create an External transform in Java SDK, which expands
>>> into several ParDo and a Window.into(FixWindow). When I use this transform
>>> in Python SDK, I get an pipeline construction error:
>>> >
>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>> >
>>> > Is it expected that I cannot use a Window.into when building External
>>> Ptransform? Or do I miss anything here?
>>> >
>>> >
>>> > Thanks for your help!
>>>
>>

Re: Create External Transform with WindowFn

Posted by Chamikara Jayalath <ch...@google.com>.
Pipelines that use external WindowingStrategies might be failing during
proto -> object -> proto conversion we do today. This limitation will go
away once Dataflow directly starts reading Beam protos. We are working on
this now.

Thanks,
Cham

On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <bo...@google.com> wrote:

> Thanks, Robert! I want to add more details on my External PTransform:
>
> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow) ->
> ParDo() -> output void
>                                                                     |
>                                                                     ->
> ParDo() -> output PCollection to Python SDK
> The full stacktrace:
>
> INFO:root:Using Java SDK harness container image dataflow-dev.gcr.io/boyuanz/java:latest
> Starting expansion service at localhost:53569
> Aug 13, 2020 7:42:11 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
> 	beam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
> 	beam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
> 	beam:external:java:jdbc:read_rows:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
> 	beam:external:java:jdbc:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
> 	beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is deprecated. Please use --worker_zone instead.
> Aug 13, 2020 7:42:12 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
> Aug 13, 2020 7:42:14 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>
> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
> INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, we will try to pull from hub.docker.com
> Traceback (most recent call last):
>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in <module>
>     run()
>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
>     test_method(beam.Pipeline(options=pipeline_options))
>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in run_xlang_kafkaio
>     pipeline.run(False)
>   File "apache_beam/pipeline.py", line 534, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in run_pipeline
>     allow_proto_holders=True)
>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>     id in proto.outputs.items()
>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>     id in proto.outputs.items()
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>     proto.windowing_strategy_id),
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
> KeyError: 'beam:window_fn:serialized_java:v1'
>
>
> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> You should be able to use a WindowInto with any of the common
>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>> external transform. You should also be able to window into an
>> arbitrary WindowFn as long as it produces standards window types, but
>> if there's a bug here you could possibly work around it by windowing
>> into a more standard windowing fn before returning.
>>
>> What is the full traceback?
>>
>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com> wrote:
>> >
>> > Hi team,
>> >
>> > I'm trying to create an External transform in Java SDK, which expands
>> into several ParDo and a Window.into(FixWindow). When I use this transform
>> in Python SDK, I get an pipeline construction error:
>> >
>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>> > KeyError: 'beam:window_fn:serialized_java:v1'
>> >
>> > Is it expected that I cannot use a Window.into when building External
>> Ptransform? Or do I miss anything here?
>> >
>> >
>> > Thanks for your help!
>>
>

Re: Create External Transform with WindowFn

Posted by Boyuan Zhang <bo...@google.com>.
Thanks, Robert! I want to add more details on my External PTransform:

MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow) ->
ParDo() -> output void
                                                                    |
                                                                    ->
ParDo() -> output PCollection to Python SDK
The full stacktrace:

INFO:root:Using Java SDK harness container image
dataflow-dev.gcr.io/boyuanz/java:latest
Starting expansion service at localhost:53569
Aug 13, 2020 7:42:11 PM
org.apache.beam.sdk.expansion.service.ExpansionService
loadRegisteredTransforms
INFO: Registering external transforms:
[beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1,
beam:external:java:jdbc:read_rows:v1,
beam:external:java:jdbc:write:v1,
beam:external:java:generate_sequence:v1]
	beam:external:java:kafka:read:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
	beam:external:java:kafka:write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
	beam:external:java:jdbc:read_rows:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
	beam:external:java:jdbc:write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
	beam:external:java:generate_sequence:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
WARNING:apache_beam.options.pipeline_options_validator:Option --zone
is deprecated. Please use --worker_zone instead.
Aug 13, 2020 7:42:12 PM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
Aug 13, 2020 7:42:14 PM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'

WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.6 interpreter.
INFO:root:Using Python SDK docker image:
apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at
local, we will try to pull from hub.docker.com
Traceback (most recent call last):
  File "<embedded module '_launcher'>", line 165, in run_filename_as_main
  File "<embedded module '_launcher'>", line 39, in _run_code_in_main
  File "apache_beam/integration/cross_language_kafkaio_test.py", line
87, in <module>
    run()
  File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
    test_method(beam.Pipeline(options=pipeline_options))
  File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in
run_xlang_kafkaio
    pipeline.run(False)
  File "apache_beam/pipeline.py", line 534, in run
    return self.runner.run_pipeline(self, self._options)
  File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in
run_pipeline
    allow_proto_holders=True)
  File "apache_beam/pipeline.py", line 879, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1272, in from_runner_api
    id in proto.outputs.items()
  File "apache_beam/pipeline.py", line 1272, in <dictcomp>
    id in proto.outputs.items()
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pvalue.py", line 217, in from_runner_api
    proto.windowing_strategy_id),
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/transforms/core.py", line 2597, in from_runner_api
    windowfn=WindowFn.from_runner_api(proto.window_fn, context),
  File "apache_beam/utils/urns.py", line 186, in from_runner_api
    parameter_type, constructor = cls._known_urns[fn_proto.urn]
KeyError: 'beam:window_fn:serialized_java:v1'


On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <ro...@google.com> wrote:

> You should be able to use a WindowInto with any of the common
> windowing operations (e.g. global, fixed, sliding, sessions) in an
> external transform. You should also be able to window into an
> arbitrary WindowFn as long as it produces standards window types, but
> if there's a bug here you could possibly work around it by windowing
> into a more standard windowing fn before returning.
>
> What is the full traceback?
>
> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com> wrote:
> >
> > Hi team,
> >
> > I'm trying to create an External transform in Java SDK, which expands
> into several ParDo and a Window.into(FixWindow). When I use this transform
> in Python SDK, I get an pipeline construction error:
> >
> > apache_beam/utils/urns.py", line 186, in from_runner_api
> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
> > KeyError: 'beam:window_fn:serialized_java:v1'
> >
> > Is it expected that I cannot use a Window.into when building External
> Ptransform? Or do I miss anything here?
> >
> >
> > Thanks for your help!
>

Re: Create External Transform with WindowFn

Posted by Robert Bradshaw <ro...@google.com>.
You should be able to use a WindowInto with any of the common
windowing operations (e.g. global, fixed, sliding, sessions) in an
external transform. You should also be able to window into an
arbitrary WindowFn as long as it produces standards window types, but
if there's a bug here you could possibly work around it by windowing
into a more standard windowing fn before returning.

What is the full traceback?

On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <bo...@google.com> wrote:
>
> Hi team,
>
> I'm trying to create an External transform in Java SDK, which expands into several ParDo and a Window.into(FixWindow). When I use this transform in Python SDK, I get an pipeline construction error:
>
> apache_beam/utils/urns.py", line 186, in from_runner_api
>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
> KeyError: 'beam:window_fn:serialized_java:v1'
>
> Is it expected that I cannot use a Window.into when building External Ptransform? Or do I miss anything here?
>
>
> Thanks for your help!