You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Piotr Szuberski <pi...@polidea.com> on 2020/08/03 16:22:32 UTC

Unknown accumulator coder error when running cross-language SpannerIO Write

I'm Writing SpannerIO.Write cross-language transform and when I try to run it from python I receive errors:

On Flink:
apache_beam.utils.subprocess_server: INFO: b'Caused by: java.lang.IllegalArgumentException: Transform external_1HolderCoder uses unknown accumulator coder id %s'
apache_beam.utils.subprocess_server: INFO: b'\tat org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)'
apache_beam.utils.subprocess_server: INFO: b'\tat org.apache.beam.runners.core.construction.graph.PipelineValidator.validateCombine(PipelineValidator.java:273)'

On DirectRunner:
  File "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 181, in run_via_runner_api
    self._validate_requirements(pipeline_proto)
  File "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 264, in _validate_requirements
    raise ValueError(
ValueError: Missing requirement declaration: {'beam:requirement:pardo:splittable_dofn:v1'}

I suppose that SpannerIO.Write uses a transform that cannot be translated in cross-language usage? I'm not sure whether there is something I can do about it.


Re: Unknown accumulator coder error when running cross-language SpannerIO Write

Posted by Brian Hulette <bh...@google.com>.
I thought there may be some additional logging with the coder id since the
message just said "accumulator coder id %s". It looks like this is just a
bad set of arguments in checkArgument, I put up a fix in [1].
So in fact "external_1HolderCoder" is the coder id. This looks like it
could be an issue with component id assignment for cross-language. It could
be instructive to look at the final Pipeline proto and/or the protos in the
expansion request and response. Do they contain another HolderCoder
component with a different id?

Regarding the DirectRunner error, it looks like SDF is in fact supported in
the FnApiRunner, but there's some kind of issue with requirements
declarations. I bet if you get past that though you'll likely run into the
same issue as on Flink, it looks to me like something is wrong with the
Pipeline proto.

[1] https://github.com/apache/beam/pull/12522

On Tue, Aug 4, 2020 at 11:50 AM Boyuan Zhang <bo...@google.com> wrote:

> Hi Piotr,
>
> Are you using the beam master head to dev? Can you share your code? The
> x-lang transform can be tested with Flink runner, where SDF is also
> supported, such as
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/flink_runner_test.py#L205-L261
>
> On Tue, Aug 4, 2020 at 9:42 AM Piotr Szuberski <
> piotr.szuberski@polidea.com> wrote:
>
>> Is there a simple way to register the splittable dofn for cross-language
>> usage? It's a bit a black box to me right now.
>>
>> The most meaningful logs for Flink are the ones I pasted and the
>> following:
>>
>> apache_beam.utils.subprocess_server: INFO: b'[grpc-default-executor-0]
>> WARN org.apache.beam.runners.jobsubmission.InMemoryJobService - Encountered
>> Unexpected Exception during validation'
>> apache_beam.utils.subprocess_server: INFO: b'java.lang.RuntimeException:
>> Failed to validate transform ref_AppliedPTransform_Write to Spanner/Write
>> mutations to Cloud Spanner/Schema
>> View/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)_31'
>>
>> and a shortened oneline message:
>> [...] DEBUG: Stages: ['ref_AppliedPTransform_Generate input/Impulse_3\n
>> Generate input/Impulse:beam:transform:impulse:v1\n  must follow: \n
>> downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate
>> input/FlatMap(<lambda at core.py:2826>)_4\n  Generate input/FlatMap(<lambda
>> at core.py:2826>):beam:transform:pardo:v1\n  must follow: \n
>> downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate
>> input/Map(decode)_6\n [...]
>>
>> On 2020/08/03 23:40:42, Brian Hulette <bh...@google.com> wrote:
>> > The DirectRunner error looks like it's because the FnApiRunner doesn't
>> > support SDF.
>> >
>> > What is the coder id for the Flink error? It looks like the full stack
>> > trace should contain it.
>> >
>> > On Mon, Aug 3, 2020 at 10:09 AM Piotr Szuberski <
>> piotr.szuberski@polidea.com>
>> > wrote:
>> >
>> > > I'm Writing SpannerIO.Write cross-language transform and when I try
>> to run
>> > > it from python I receive errors:
>> > >
>> > > On Flink:
>> > > apache_beam.utils.subprocess_server: INFO: b'Caused by:
>> > > java.lang.IllegalArgumentException: Transform external_1HolderCoder
>> uses
>> > > unknown accumulator coder id %s'
>> > > apache_beam.utils.subprocess_server: INFO: b'\tat
>> > >
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)'
>> > > apache_beam.utils.subprocess_server: INFO: b'\tat
>> > >
>> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateCombine(PipelineValidator.java:273)'
>> > >
>> > > On DirectRunner:
>> > >   File
>> > >
>> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>> > > line 181, in run_via_runner_api
>> > >     self._validate_requirements(pipeline_proto)
>> > >   File
>> > >
>> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>> > > line 264, in _validate_requirements
>> > >     raise ValueError(
>> > > ValueError: Missing requirement declaration:
>> > > {'beam:requirement:pardo:splittable_dofn:v1'}
>> > >
>> > > I suppose that SpannerIO.Write uses a transform that cannot be
>> translated
>> > > in cross-language usage? I'm not sure whether there is something I
>> can do
>> > > about it.
>> > >
>> > >
>> >
>>
>

Re: Unknown accumulator coder error when running cross-language SpannerIO Write

Posted by Boyuan Zhang <bo...@google.com>.
Hi Piotr,

Are you using the beam master head to dev? Can you share your code? The
x-lang transform can be tested with Flink runner, where SDF is also
supported, such as
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/flink_runner_test.py#L205-L261

On Tue, Aug 4, 2020 at 9:42 AM Piotr Szuberski <pi...@polidea.com>
wrote:

> Is there a simple way to register the splittable dofn for cross-language
> usage? It's a bit a black box to me right now.
>
> The most meaningful logs for Flink are the ones I pasted and the following:
>
> apache_beam.utils.subprocess_server: INFO: b'[grpc-default-executor-0]
> WARN org.apache.beam.runners.jobsubmission.InMemoryJobService - Encountered
> Unexpected Exception during validation'
> apache_beam.utils.subprocess_server: INFO: b'java.lang.RuntimeException:
> Failed to validate transform ref_AppliedPTransform_Write to Spanner/Write
> mutations to Cloud Spanner/Schema
> View/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)_31'
>
> and a shortened oneline message:
> [...] DEBUG: Stages: ['ref_AppliedPTransform_Generate input/Impulse_3\n
> Generate input/Impulse:beam:transform:impulse:v1\n  must follow: \n
> downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate
> input/FlatMap(<lambda at core.py:2826>)_4\n  Generate input/FlatMap(<lambda
> at core.py:2826>):beam:transform:pardo:v1\n  must follow: \n
> downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate
> input/Map(decode)_6\n [...]
>
> On 2020/08/03 23:40:42, Brian Hulette <bh...@google.com> wrote:
> > The DirectRunner error looks like it's because the FnApiRunner doesn't
> > support SDF.
> >
> > What is the coder id for the Flink error? It looks like the full stack
> > trace should contain it.
> >
> > On Mon, Aug 3, 2020 at 10:09 AM Piotr Szuberski <
> piotr.szuberski@polidea.com>
> > wrote:
> >
> > > I'm Writing SpannerIO.Write cross-language transform and when I try to
> run
> > > it from python I receive errors:
> > >
> > > On Flink:
> > > apache_beam.utils.subprocess_server: INFO: b'Caused by:
> > > java.lang.IllegalArgumentException: Transform external_1HolderCoder
> uses
> > > unknown accumulator coder id %s'
> > > apache_beam.utils.subprocess_server: INFO: b'\tat
> > >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)'
> > > apache_beam.utils.subprocess_server: INFO: b'\tat
> > >
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateCombine(PipelineValidator.java:273)'
> > >
> > > On DirectRunner:
> > >   File
> > >
> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> > > line 181, in run_via_runner_api
> > >     self._validate_requirements(pipeline_proto)
> > >   File
> > >
> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> > > line 264, in _validate_requirements
> > >     raise ValueError(
> > > ValueError: Missing requirement declaration:
> > > {'beam:requirement:pardo:splittable_dofn:v1'}
> > >
> > > I suppose that SpannerIO.Write uses a transform that cannot be
> translated
> > > in cross-language usage? I'm not sure whether there is something I can
> do
> > > about it.
> > >
> > >
> >
>

Re: Unknown accumulator coder error when running cross-language SpannerIO Write

Posted by Piotr Szuberski <pi...@polidea.com>.
Is there a simple way to register the splittable dofn for cross-language usage? It's a bit a black box to me right now.

The most meaningful logs for Flink are the ones I pasted and the following:

apache_beam.utils.subprocess_server: INFO: b'[grpc-default-executor-0] WARN org.apache.beam.runners.jobsubmission.InMemoryJobService - Encountered Unexpected Exception during validation'
apache_beam.utils.subprocess_server: INFO: b'java.lang.RuntimeException: Failed to validate transform ref_AppliedPTransform_Write to Spanner/Write mutations to Cloud Spanner/Schema View/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)_31'

and a shortened oneline message:
[...] DEBUG: Stages: ['ref_AppliedPTransform_Generate input/Impulse_3\n  Generate input/Impulse:beam:transform:impulse:v1\n  must follow: \n  downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate input/FlatMap(<lambda at core.py:2826>)_4\n  Generate input/FlatMap(<lambda at core.py:2826>):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: <unknown>', 'ref_AppliedPTransform_Generate input/Map(decode)_6\n [...]

On 2020/08/03 23:40:42, Brian Hulette <bh...@google.com> wrote: 
> The DirectRunner error looks like it's because the FnApiRunner doesn't
> support SDF.
> 
> What is the coder id for the Flink error? It looks like the full stack
> trace should contain it.
> 
> On Mon, Aug 3, 2020 at 10:09 AM Piotr Szuberski <pi...@polidea.com>
> wrote:
> 
> > I'm Writing SpannerIO.Write cross-language transform and when I try to run
> > it from python I receive errors:
> >
> > On Flink:
> > apache_beam.utils.subprocess_server: INFO: b'Caused by:
> > java.lang.IllegalArgumentException: Transform external_1HolderCoder uses
> > unknown accumulator coder id %s'
> > apache_beam.utils.subprocess_server: INFO: b'\tat
> > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)'
> > apache_beam.utils.subprocess_server: INFO: b'\tat
> > org.apache.beam.runners.core.construction.graph.PipelineValidator.validateCombine(PipelineValidator.java:273)'
> >
> > On DirectRunner:
> >   File
> > "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> > line 181, in run_via_runner_api
> >     self._validate_requirements(pipeline_proto)
> >   File
> > "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> > line 264, in _validate_requirements
> >     raise ValueError(
> > ValueError: Missing requirement declaration:
> > {'beam:requirement:pardo:splittable_dofn:v1'}
> >
> > I suppose that SpannerIO.Write uses a transform that cannot be translated
> > in cross-language usage? I'm not sure whether there is something I can do
> > about it.
> >
> >
> 

Re: Unknown accumulator coder error when running cross-language SpannerIO Write

Posted by Brian Hulette <bh...@google.com>.
The DirectRunner error looks like it's because the FnApiRunner doesn't
support SDF.

What is the coder id for the Flink error? It looks like the full stack
trace should contain it.

On Mon, Aug 3, 2020 at 10:09 AM Piotr Szuberski <pi...@polidea.com>
wrote:

> I'm Writing SpannerIO.Write cross-language transform and when I try to run
> it from python I receive errors:
>
> On Flink:
> apache_beam.utils.subprocess_server: INFO: b'Caused by:
> java.lang.IllegalArgumentException: Transform external_1HolderCoder uses
> unknown accumulator coder id %s'
> apache_beam.utils.subprocess_server: INFO: b'\tat
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)'
> apache_beam.utils.subprocess_server: INFO: b'\tat
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateCombine(PipelineValidator.java:273)'
>
> On DirectRunner:
>   File
> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 181, in run_via_runner_api
>     self._validate_requirements(pipeline_proto)
>   File
> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 264, in _validate_requirements
>     raise ValueError(
> ValueError: Missing requirement declaration:
> {'beam:requirement:pardo:splittable_dofn:v1'}
>
> I suppose that SpannerIO.Write uses a transform that cannot be translated
> in cross-language usage? I'm not sure whether there is something I can do
> about it.
>
>