You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jeff Rhyason <jr...@gmail.com> on 2021/11/05 06:40:08 UTC

Apache Beam Go SDK Quickstart bugs

I'm interested to see the Go SDK work with the Spark runner. Based on the
instructions at https://beam.apache.org/get-started/quickstart-go/, I run
these commands and get the following failure:

$ ./gradlew :runners:spark:2:job-server:runShadow
in another window:
$ cd sdks
$ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
spark --endpoint localhost:8099
2021/11/04 22:06:46 No environment config specified. Using default config:
'apache/beam_go_sdk:2.35.0.dev'
2021/11/04 22:06:46 Failed to execute job:      generating model pipeline
failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
{5: string/string GLO}]}] [0xc000096cd0]}
        caused by:
failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <- {4:
string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
        caused by:
failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
-> [Out: string -> {5: string/string GLO}]
        caused by:
        encoding userfn 5: ParDo [In(Main): string <- {4: string/string
GLO}] -> [Out: string -> {5: string/string GLO}]
bad userfn
        caused by:
        encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
map[ProcessElement:0xc0004fcac0] map[]}
receiver type *main.extractFn must be registered
exit status 1

I was able to register that type, like this:

diff --git a/sdks/go/examples/wordcount/wordcount.go
b/sdks/go/examples/wordcount/wordcount.go
index 4d54db9a2d..6db99d6220 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -60,6 +60,7 @@ import (
        "flag"
        "fmt"
        "log"
+       "reflect"
        "regexp"
        "strings"

@@ -107,6 +108,7 @@ var (
 // by calling beam.RegisterFunction in an init() call.
 func init() {
        beam.RegisterFunction(formatFn)
+       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
 }

 var (


Then I encountered:

$ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
spark   --endpoint localhost:8099
...
2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
class file major version 55
2021/11/04 23:07:26 Job state: FAILED
2021/11/04 23:07:26 Failed to execute job: job
go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
failed
exit status 1


Switching to the Spark 3.0 job server changed things:
$ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
...
$ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
--runner spark   --endpoint localhost:8099
...
2021/11/04 23:12:04 Staged binary artifact with token:
2021/11/04 23:12:04 Submitted job:
go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
2021/11/04 23:12:04 Job state: STOPPED
2021/11/04 23:12:04 Job state: STARTING
2021/11/04 23:12:04 Job state: RUNNING
2021/11/04 23:12:17 Job state: DONE
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
2021/11/04 23:12:17 unknown metric type
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
 labels:{key:"PCOLLECTION"  value:"n1"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
 labels:{key:"PCOLLECTION"  value:"n3"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
 labels:{key:"PCOLLECTION"  value:"n2"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
 labels:{key:"PCOLLECTION"  value:"n4"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
 value:"n6"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
2021/11/04 23:12:17 unknown metric type
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
 value:"n7"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
2021/11/04 23:12:17 unknown metric type
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
 labels:{key:"PCOLLECTION"  value:"n11"}

However misleading those failures are, the process exits successfully. I
have more to learn about where the output went.

It's really neat to see this working.

Would you be interested in PRs for these?
* Go examples to register all the types needed for other runners
* updating the Go Quick Start to use the Spark 3 runner so it plays better
with the embedded Spark cluster

Jeff

Re: Apache Beam Go SDK Quickstart bugs

Posted by Jeff Rhyason <jr...@gmail.com>.
Hi Robert! Your fix was much more thorough than what I was cooking up.
Thanks for such a comprehensive fix.

Also, thanks to you and Kyle for explaining the details of the other things
I observed. This is working very well now!

Jeff


On Sun, Nov 14, 2021 at 4:04 PM Robert Burke <ro...@frantil.com> wrote:

> With the 2.35.0 cut coming on Wednesday (Nov 17th) I took the liberty to
> fix all the Go SDK examples under Spark 3. I don't like "stealing" work,
> but we had not heard from you since this was brought to our attention. So,
> for that, I'm sorry.
>
> https://github.com/apache/beam/pull/15970
>
> Found a bug with the schema row decoder along the way too.
>
> Since the website tracks live, getting the quick start to use Spark 3
> doesn't have to happen before the cut, so that's still available to do.
>
> I really appreciate the clear errors and repros you provided!
>
> Thanks again
> Robert Burke
>
> A
>
> On Mon, Nov 8, 2021, 1:27 PM Robert Burke <re...@google.com> wrote:
>
>> +1 to Kyle's LOOPBACK suggestion. Gives you your local file system, and
>> you can println debug to the console. However, only would be a single
>> worker.
>>
>> On Mon, Nov 8, 2021 at 1:23 PM Robert Burke <re...@google.com> wrote:
>>
>>> Oh that's definitely something needs updating. Yes please to those PRs.
>>>
>>> Please add a mention to @lostluck for me to review them.
>>>
>>> The "Unsupported class file major version" is a mismatch between Java8
>>> and Java 11, unrelated to the Go SDK, so I agree that the example should
>>> spin up a spark3 instead of the older version.
>>>
>>> The "Failed to deduce Step from MonitoringInfo" messages are an
>>> unfortunately noisy error message post successful job, because the code
>>> doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is
>>> working on that.  They probably need to be consolidated or ignored for now.
>>> Right now, they come from here:
>>> https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51
>>>
>>>
>>>
>>> On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> Awesome! Just going to add a few colleagues (who are subscribed anyhow)
>>>> to make sure this hits the top of their inbox.
>>>>
>>>> +Robert Burke <re...@google.com> +Chamikara Jayalath
>>>> <ch...@google.com> +Kyle Weaver <kc...@google.com>
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jr...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm interested to see the Go SDK work with the Spark runner. Based on
>>>>> the instructions at https://beam.apache.org/get-started/quickstart-go/,
>>>>> I run these commands and get the following failure:
>>>>>
>>>>> $ ./gradlew :runners:spark:2:job-server:runShadow
>>>>> in another window:
>>>>> $ cd sdks
>>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>>>> --runner spark --endpoint localhost:8099
>>>>> 2021/11/04 22:06:46 No environment config specified. Using default
>>>>> config: 'apache/beam_go_sdk:2.35.0.dev'
>>>>> 2021/11/04 22:06:46 Failed to execute job:      generating model
>>>>> pipeline
>>>>> failed to add scope tree: &{{CountWords root/CountWords}
>>>>> [{main.extractFn 5: ParDo [In(Main): string <- {4: string/string GLO}] ->
>>>>> [Out: string -> {5: string/string GLO}]}] [0xc000096cd0]}
>>>>>         caused by:
>>>>> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string
>>>>> <- {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>>>>>         caused by:
>>>>> failed to serialize 5: ParDo [In(Main): string <- {4: string/string
>>>>> GLO}] -> [Out: string -> {5: string/string GLO}]
>>>>>         caused by:
>>>>>         encoding userfn 5: ParDo [In(Main): string <- {4:
>>>>> string/string GLO}] -> [Out: string -> {5: string/string GLO}]
>>>>> bad userfn
>>>>>         caused by:
>>>>>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
>>>>> map[ProcessElement:0xc0004fcac0] map[]}
>>>>> receiver type *main.extractFn must be registered
>>>>> exit status 1
>>>>>
>>>>> I was able to register that type, like this:
>>>>>
>>>>> diff --git a/sdks/go/examples/wordcount/wordcount.go
>>>>> b/sdks/go/examples/wordcount/wordcount.go
>>>>> index 4d54db9a2d..6db99d6220 100644
>>>>> --- a/sdks/go/examples/wordcount/wordcount.go
>>>>> +++ b/sdks/go/examples/wordcount/wordcount.go
>>>>> @@ -60,6 +60,7 @@ import (
>>>>>         "flag"
>>>>>         "fmt"
>>>>>         "log"
>>>>> +       "reflect"
>>>>>         "regexp"
>>>>>         "strings"
>>>>>
>>>>> @@ -107,6 +108,7 @@ var (
>>>>>  // by calling beam.RegisterFunction in an init() call.
>>>>>  func init() {
>>>>>         beam.RegisterFunction(formatFn)
>>>>> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>>>>>  }
>>>>>
>>>>>  var (
>>>>>
>>>>>
>>>>> Then I encountered:
>>>>>
>>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>>>> --runner spark   --endpoint localhost:8099
>>>>> ...
>>>>> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException:
>>>>> Unsupported class file major version 55
>>>>> 2021/11/04 23:07:26 Job state: FAILED
>>>>> 2021/11/04 23:07:26 Failed to execute job: job
>>>>> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
>>>>> failed
>>>>> exit status 1
>>>>>
>>>>>
>>>>> Switching to the Spark 3.0 job server changed things:
>>>>> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
>>>>> ...
>>>>> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go
>>>>>  --output foo --runner spark   --endpoint localhost:8099
>>>>> ...
>>>>> 2021/11/04 23:12:04 Staged binary artifact with token:
>>>>> 2021/11/04 23:12:04 Submitted job:
>>>>> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
>>>>> 2021/11/04 23:12:04 Job state: STOPPED
>>>>> 2021/11/04 23:12:04 Job state: STARTING
>>>>> 2021/11/04 23:12:04 Job state: RUNNING
>>>>> 2021/11/04 23:12:17 Job state: DONE
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
>>>>> 2021/11/04 23:12:17 unknown metric type
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>>>>>  labels:{key:"PCOLLECTION"  value:"n1"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>>>  labels:{key:"PCOLLECTION"  value:"n3"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>>>  labels:{key:"PCOLLECTION"  value:"n2"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>>>>>  labels:{key:"PCOLLECTION"  value:"n4"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>>>>>  value:"n6"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
>>>>> 2021/11/04 23:12:17 unknown metric type
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>>>>>  value:"n7"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
>>>>> 2021/11/04 23:12:17 unknown metric type
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>>>>>  labels:{key:"PCOLLECTION"  value:"n11"}
>>>>>
>>>>> However misleading those failures are, the process exits successfully.
>>>>> I have more to learn about where the output went.
>>>>>
>>>>> It's really neat to see this working.
>>>>>
>>>>> Would you be interested in PRs for these?
>>>>> * Go examples to register all the types needed for other runners
>>>>> * updating the Go Quick Start to use the Spark 3 runner so it plays
>>>>> better with the embedded Spark cluster
>>>>>
>>>>> Jeff
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Re: Apache Beam Go SDK Quickstart bugs

Posted by Robert Burke <ro...@frantil.com>.
With the 2.35.0 cut coming on Wednesday (Nov 17th) I took the liberty to
fix all the Go SDK examples under Spark 3. I don't like "stealing" work,
but we had not heard from you since this was brought to our attention. So,
for that, I'm sorry.

https://github.com/apache/beam/pull/15970

Found a bug with the schema row decoder along the way too.

Since the website tracks live, getting the quick start to use Spark 3
doesn't have to happen before the cut, so that's still available to do.

I really appreciate the clear errors and repros you provided!

Thanks again
Robert Burke

A

On Mon, Nov 8, 2021, 1:27 PM Robert Burke <re...@google.com> wrote:

> +1 to Kyle's LOOPBACK suggestion. Gives you your local file system, and
> you can println debug to the console. However, only would be a single
> worker.
>
> On Mon, Nov 8, 2021 at 1:23 PM Robert Burke <re...@google.com> wrote:
>
>> Oh that's definitely something needs updating. Yes please to those PRs.
>>
>> Please add a mention to @lostluck for me to review them.
>>
>> The "Unsupported class file major version" is a mismatch between Java8
>> and Java 11, unrelated to the Go SDK, so I agree that the example should
>> spin up a spark3 instead of the older version.
>>
>> The "Failed to deduce Step from MonitoringInfo" messages are an
>> unfortunately noisy error message post successful job, because the code
>> doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is
>> working on that.  They probably need to be consolidated or ignored for now.
>> Right now, they come from here:
>> https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51
>>
>>
>>
>> On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Awesome! Just going to add a few colleagues (who are subscribed anyhow)
>>> to make sure this hits the top of their inbox.
>>>
>>> +Robert Burke <re...@google.com> +Chamikara Jayalath
>>> <ch...@google.com> +Kyle Weaver <kc...@google.com>
>>>
>>> Kenn
>>>
>>> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jr...@gmail.com> wrote:
>>>
>>>> I'm interested to see the Go SDK work with the Spark runner. Based on
>>>> the instructions at https://beam.apache.org/get-started/quickstart-go/,
>>>> I run these commands and get the following failure:
>>>>
>>>> $ ./gradlew :runners:spark:2:job-server:runShadow
>>>> in another window:
>>>> $ cd sdks
>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>>> --runner spark --endpoint localhost:8099
>>>> 2021/11/04 22:06:46 No environment config specified. Using default
>>>> config: 'apache/beam_go_sdk:2.35.0.dev'
>>>> 2021/11/04 22:06:46 Failed to execute job:      generating model
>>>> pipeline
>>>> failed to add scope tree: &{{CountWords root/CountWords}
>>>> [{main.extractFn 5: ParDo [In(Main): string <- {4: string/string GLO}] ->
>>>> [Out: string -> {5: string/string GLO}]}] [0xc000096cd0]}
>>>>         caused by:
>>>> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
>>>> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>>>>         caused by:
>>>> failed to serialize 5: ParDo [In(Main): string <- {4: string/string
>>>> GLO}] -> [Out: string -> {5: string/string GLO}]
>>>>         caused by:
>>>>         encoding userfn 5: ParDo [In(Main): string <- {4: string/string
>>>> GLO}] -> [Out: string -> {5: string/string GLO}]
>>>> bad userfn
>>>>         caused by:
>>>>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
>>>> map[ProcessElement:0xc0004fcac0] map[]}
>>>> receiver type *main.extractFn must be registered
>>>> exit status 1
>>>>
>>>> I was able to register that type, like this:
>>>>
>>>> diff --git a/sdks/go/examples/wordcount/wordcount.go
>>>> b/sdks/go/examples/wordcount/wordcount.go
>>>> index 4d54db9a2d..6db99d6220 100644
>>>> --- a/sdks/go/examples/wordcount/wordcount.go
>>>> +++ b/sdks/go/examples/wordcount/wordcount.go
>>>> @@ -60,6 +60,7 @@ import (
>>>>         "flag"
>>>>         "fmt"
>>>>         "log"
>>>> +       "reflect"
>>>>         "regexp"
>>>>         "strings"
>>>>
>>>> @@ -107,6 +108,7 @@ var (
>>>>  // by calling beam.RegisterFunction in an init() call.
>>>>  func init() {
>>>>         beam.RegisterFunction(formatFn)
>>>> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>>>>  }
>>>>
>>>>  var (
>>>>
>>>>
>>>> Then I encountered:
>>>>
>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>>> --runner spark   --endpoint localhost:8099
>>>> ...
>>>> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException:
>>>> Unsupported class file major version 55
>>>> 2021/11/04 23:07:26 Job state: FAILED
>>>> 2021/11/04 23:07:26 Failed to execute job: job
>>>> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
>>>> failed
>>>> exit status 1
>>>>
>>>>
>>>> Switching to the Spark 3.0 job server changed things:
>>>> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
>>>> ...
>>>> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
>>>> foo --runner spark   --endpoint localhost:8099
>>>> ...
>>>> 2021/11/04 23:12:04 Staged binary artifact with token:
>>>> 2021/11/04 23:12:04 Submitted job:
>>>> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
>>>> 2021/11/04 23:12:04 Job state: STOPPED
>>>> 2021/11/04 23:12:04 Job state: STARTING
>>>> 2021/11/04 23:12:04 Job state: RUNNING
>>>> 2021/11/04 23:12:17 Job state: DONE
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
>>>> 2021/11/04 23:12:17 unknown metric type
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>>>>  labels:{key:"PCOLLECTION"  value:"n1"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>>  labels:{key:"PCOLLECTION"  value:"n3"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>>  labels:{key:"PCOLLECTION"  value:"n2"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>>>>  labels:{key:"PCOLLECTION"  value:"n4"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>>>>  value:"n6"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
>>>> 2021/11/04 23:12:17 unknown metric type
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>>>>  value:"n7"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
>>>> 2021/11/04 23:12:17 unknown metric type
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>>>>  labels:{key:"PCOLLECTION"  value:"n11"}
>>>>
>>>> However misleading those failures are, the process exits successfully.
>>>> I have more to learn about where the output went.
>>>>
>>>> It's really neat to see this working.
>>>>
>>>> Would you be interested in PRs for these?
>>>> * Go examples to register all the types needed for other runners
>>>> * updating the Go Quick Start to use the Spark 3 runner so it plays
>>>> better with the embedded Spark cluster
>>>>
>>>> Jeff
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Re: Apache Beam Go SDK Quickstart bugs

Posted by Robert Burke <re...@google.com>.
+1 to Kyle's LOOPBACK suggestion. Gives you your local file system, and you
can println debug to the console. However, only would be a single worker.

On Mon, Nov 8, 2021 at 1:23 PM Robert Burke <re...@google.com> wrote:

> Oh that's definitely something needs updating. Yes please to those PRs.
>
> Please add a mention to @lostluck for me to review them.
>
> The "Unsupported class file major version" is a mismatch between Java8 and
> Java 11, unrelated to the Go SDK, so I agree that the example should spin
> up a spark3 instead of the older version.
>
> The "Failed to deduce Step from MonitoringInfo" messages are an
> unfortunately noisy error message post successful job, because the code
> doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is
> working on that.  They probably need to be consolidated or ignored for now.
> Right now, they come from here:
> https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51
>
>
>
> On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Awesome! Just going to add a few colleagues (who are subscribed anyhow)
>> to make sure this hits the top of their inbox.
>>
>> +Robert Burke <re...@google.com> +Chamikara Jayalath
>> <ch...@google.com> +Kyle Weaver <kc...@google.com>
>>
>> Kenn
>>
>> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jr...@gmail.com> wrote:
>>
>>> I'm interested to see the Go SDK work with the Spark runner. Based on
>>> the instructions at https://beam.apache.org/get-started/quickstart-go/,
>>> I run these commands and get the following failure:
>>>
>>> $ ./gradlew :runners:spark:2:job-server:runShadow
>>> in another window:
>>> $ cd sdks
>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>> --runner spark --endpoint localhost:8099
>>> 2021/11/04 22:06:46 No environment config specified. Using default
>>> config: 'apache/beam_go_sdk:2.35.0.dev'
>>> 2021/11/04 22:06:46 Failed to execute job:      generating model pipeline
>>> failed to add scope tree: &{{CountWords root/CountWords}
>>> [{main.extractFn 5: ParDo [In(Main): string <- {4: string/string GLO}] ->
>>> [Out: string -> {5: string/string GLO}]}] [0xc000096cd0]}
>>>         caused by:
>>> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
>>> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>>>         caused by:
>>> failed to serialize 5: ParDo [In(Main): string <- {4: string/string
>>> GLO}] -> [Out: string -> {5: string/string GLO}]
>>>         caused by:
>>>         encoding userfn 5: ParDo [In(Main): string <- {4: string/string
>>> GLO}] -> [Out: string -> {5: string/string GLO}]
>>> bad userfn
>>>         caused by:
>>>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
>>> map[ProcessElement:0xc0004fcac0] map[]}
>>> receiver type *main.extractFn must be registered
>>> exit status 1
>>>
>>> I was able to register that type, like this:
>>>
>>> diff --git a/sdks/go/examples/wordcount/wordcount.go
>>> b/sdks/go/examples/wordcount/wordcount.go
>>> index 4d54db9a2d..6db99d6220 100644
>>> --- a/sdks/go/examples/wordcount/wordcount.go
>>> +++ b/sdks/go/examples/wordcount/wordcount.go
>>> @@ -60,6 +60,7 @@ import (
>>>         "flag"
>>>         "fmt"
>>>         "log"
>>> +       "reflect"
>>>         "regexp"
>>>         "strings"
>>>
>>> @@ -107,6 +108,7 @@ var (
>>>  // by calling beam.RegisterFunction in an init() call.
>>>  func init() {
>>>         beam.RegisterFunction(formatFn)
>>> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>>>  }
>>>
>>>  var (
>>>
>>>
>>> Then I encountered:
>>>
>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>> --runner spark   --endpoint localhost:8099
>>> ...
>>> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
>>> class file major version 55
>>> 2021/11/04 23:07:26 Job state: FAILED
>>> 2021/11/04 23:07:26 Failed to execute job: job
>>> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
>>> failed
>>> exit status 1
>>>
>>>
>>> Switching to the Spark 3.0 job server changed things:
>>> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
>>> ...
>>> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
>>> foo --runner spark   --endpoint localhost:8099
>>> ...
>>> 2021/11/04 23:12:04 Staged binary artifact with token:
>>> 2021/11/04 23:12:04 Submitted job:
>>> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
>>> 2021/11/04 23:12:04 Job state: STOPPED
>>> 2021/11/04 23:12:04 Job state: STARTING
>>> 2021/11/04 23:12:04 Job state: RUNNING
>>> 2021/11/04 23:12:17 Job state: DONE
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
>>> 2021/11/04 23:12:17 unknown metric type
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>>>  labels:{key:"PCOLLECTION"  value:"n1"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>  labels:{key:"PCOLLECTION"  value:"n3"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>  labels:{key:"PCOLLECTION"  value:"n2"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"
>>>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>>>  labels:{key:"PCOLLECTION"  value:"n4"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"
>>>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>>>  value:"n6"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
>>> 2021/11/04 23:12:17 unknown metric type
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"
>>>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>>>  value:"n7"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"
>>>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"
>>>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"
>>>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
>>> 2021/11/04 23:12:17 unknown metric type
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>> urn:"beam:metric:sampled_byte_size:v1"
>>>  type:"beam:metrics:distribution_int64:v1"
>>>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>>>  labels:{key:"PCOLLECTION"  value:"n11"}
>>>
>>> However misleading those failures are, the process exits successfully. I
>>> have more to learn about where the output went.
>>>
>>> It's really neat to see this working.
>>>
>>> Would you be interested in PRs for these?
>>> * Go examples to register all the types needed for other runners
>>> * updating the Go Quick Start to use the Spark 3 runner so it plays
>>> better with the embedded Spark cluster
>>>
>>> Jeff
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>

Re: Apache Beam Go SDK Quickstart bugs

Posted by Robert Burke <re...@google.com>.
Oh that's definitely something needs updating. Yes please to those PRs.

Please add a mention to @lostluck for me to review them.

The "Unsupported class file major version" is a mismatch between Java8 and
Java 11, unrelated to the Go SDK, so I agree that the example should spin
up a spark3 instead of the older version.

The "Failed to deduce Step from MonitoringInfo" messages are an
unfortunately noisy error message post successful job, because the code
doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is
working on that.  They probably need to be consolidated or ignored for now.
Right now, they come from here:
https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51



On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles <ke...@apache.org> wrote:

> Awesome! Just going to add a few colleagues (who are subscribed anyhow) to
> make sure this hits the top of their inbox.
>
> +Robert Burke <re...@google.com> +Chamikara Jayalath <ch...@google.com>
>  +Kyle Weaver <kc...@google.com>
>
> Kenn
>
> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jr...@gmail.com> wrote:
>
>> I'm interested to see the Go SDK work with the Spark runner. Based on the
>> instructions at https://beam.apache.org/get-started/quickstart-go/, I
>> run these commands and get the following failure:
>>
>> $ ./gradlew :runners:spark:2:job-server:runShadow
>> in another window:
>> $ cd sdks
>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>> --runner spark --endpoint localhost:8099
>> 2021/11/04 22:06:46 No environment config specified. Using default
>> config: 'apache/beam_go_sdk:2.35.0.dev'
>> 2021/11/04 22:06:46 Failed to execute job:      generating model pipeline
>> failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
>> 5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
>> {5: string/string GLO}]}] [0xc000096cd0]}
>>         caused by:
>> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
>> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>>         caused by:
>> failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
>> -> [Out: string -> {5: string/string GLO}]
>>         caused by:
>>         encoding userfn 5: ParDo [In(Main): string <- {4: string/string
>> GLO}] -> [Out: string -> {5: string/string GLO}]
>> bad userfn
>>         caused by:
>>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
>> map[ProcessElement:0xc0004fcac0] map[]}
>> receiver type *main.extractFn must be registered
>> exit status 1
>>
>> I was able to register that type, like this:
>>
>> diff --git a/sdks/go/examples/wordcount/wordcount.go
>> b/sdks/go/examples/wordcount/wordcount.go
>> index 4d54db9a2d..6db99d6220 100644
>> --- a/sdks/go/examples/wordcount/wordcount.go
>> +++ b/sdks/go/examples/wordcount/wordcount.go
>> @@ -60,6 +60,7 @@ import (
>>         "flag"
>>         "fmt"
>>         "log"
>> +       "reflect"
>>         "regexp"
>>         "strings"
>>
>> @@ -107,6 +108,7 @@ var (
>>  // by calling beam.RegisterFunction in an init() call.
>>  func init() {
>>         beam.RegisterFunction(formatFn)
>> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>>  }
>>
>>  var (
>>
>>
>> Then I encountered:
>>
>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>> --runner spark   --endpoint localhost:8099
>> ...
>> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
>> class file major version 55
>> 2021/11/04 23:07:26 Job state: FAILED
>> 2021/11/04 23:07:26 Failed to execute job: job
>> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
>> failed
>> exit status 1
>>
>>
>> Switching to the Spark 3.0 job server changed things:
>> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
>> ...
>> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
>> foo --runner spark   --endpoint localhost:8099
>> ...
>> 2021/11/04 23:12:04 Staged binary artifact with token:
>> 2021/11/04 23:12:04 Submitted job:
>> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
>> 2021/11/04 23:12:04 Job state: STOPPED
>> 2021/11/04 23:12:04 Job state: STARTING
>> 2021/11/04 23:12:04 Job state: RUNNING
>> 2021/11/04 23:12:17 Job state: DONE
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
>> 2021/11/04 23:12:17 unknown metric type
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>>  labels:{key:"PCOLLECTION"  value:"n1"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>  labels:{key:"PCOLLECTION"  value:"n3"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>  labels:{key:"PCOLLECTION"  value:"n2"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>>  labels:{key:"PCOLLECTION"  value:"n4"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>>  value:"n6"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
>> 2021/11/04 23:12:17 unknown metric type
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>>  value:"n7"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
>> 2021/11/04 23:12:17 unknown metric type
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>>  labels:{key:"PCOLLECTION"  value:"n11"}
>>
>> However misleading those failures are, the process exits successfully. I
>> have more to learn about where the output went.
>>
>> It's really neat to see this working.
>>
>> Would you be interested in PRs for these?
>> * Go examples to register all the types needed for other runners
>> * updating the Go Quick Start to use the Spark 3 runner so it plays
>> better with the embedded Spark cluster
>>
>> Jeff
>>
>>
>>
>>
>>
>>
>>
>>
>>

Re: Apache Beam Go SDK Quickstart bugs

Posted by Kyle Weaver <kc...@google.com>.
Hi Jeff. Glad you were able to get it working. We are always interested in
contributions.

If you're looking for the output of the pipeline, it probably got eaten by
Beam workers, which are Docker containers with transient filesystems. There
is a long-standing feature request to mount a volume on those containers so
results persist in the host file system.
https://issues.apache.org/jira/browse/BEAM-5440

The usual alternatives are to write to a distributed filesystem (Google
BigQuery, HDFS, etc., though I'm not sure about which ones the Go sdk
supports). Or you can try adding --environment_type=LOOPBACK to run your
pipeline without Dockerized workers, but this will only work locally.

Best,
Kyle

On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles <ke...@apache.org> wrote:

> Awesome! Just going to add a few colleagues (who are subscribed anyhow) to
> make sure this hits the top of their inbox.
>
> +Robert Burke <re...@google.com> +Chamikara Jayalath <ch...@google.com>
>  +Kyle Weaver <kc...@google.com>
>
> Kenn
>
> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jr...@gmail.com> wrote:
>
>> I'm interested to see the Go SDK work with the Spark runner. Based on the
>> instructions at https://beam.apache.org/get-started/quickstart-go/, I
>> run these commands and get the following failure:
>>
>> $ ./gradlew :runners:spark:2:job-server:runShadow
>> in another window:
>> $ cd sdks
>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>> --runner spark --endpoint localhost:8099
>> 2021/11/04 22:06:46 No environment config specified. Using default
>> config: 'apache/beam_go_sdk:2.35.0.dev'
>> 2021/11/04 22:06:46 Failed to execute job:      generating model pipeline
>> failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
>> 5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
>> {5: string/string GLO}]}] [0xc000096cd0]}
>>         caused by:
>> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
>> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>>         caused by:
>> failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
>> -> [Out: string -> {5: string/string GLO}]
>>         caused by:
>>         encoding userfn 5: ParDo [In(Main): string <- {4: string/string
>> GLO}] -> [Out: string -> {5: string/string GLO}]
>> bad userfn
>>         caused by:
>>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
>> map[ProcessElement:0xc0004fcac0] map[]}
>> receiver type *main.extractFn must be registered
>> exit status 1
>>
>> I was able to register that type, like this:
>>
>> diff --git a/sdks/go/examples/wordcount/wordcount.go
>> b/sdks/go/examples/wordcount/wordcount.go
>> index 4d54db9a2d..6db99d6220 100644
>> --- a/sdks/go/examples/wordcount/wordcount.go
>> +++ b/sdks/go/examples/wordcount/wordcount.go
>> @@ -60,6 +60,7 @@ import (
>>         "flag"
>>         "fmt"
>>         "log"
>> +       "reflect"
>>         "regexp"
>>         "strings"
>>
>> @@ -107,6 +108,7 @@ var (
>>  // by calling beam.RegisterFunction in an init() call.
>>  func init() {
>>         beam.RegisterFunction(formatFn)
>> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>>  }
>>
>>  var (
>>
>>
>> Then I encountered:
>>
>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>> --runner spark   --endpoint localhost:8099
>> ...
>> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
>> class file major version 55
>> 2021/11/04 23:07:26 Job state: FAILED
>> 2021/11/04 23:07:26 Failed to execute job: job
>> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
>> failed
>> exit status 1
>>
>>
>> Switching to the Spark 3.0 job server changed things:
>> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
>> ...
>> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
>> foo --runner spark   --endpoint localhost:8099
>> ...
>> 2021/11/04 23:12:04 Staged binary artifact with token:
>> 2021/11/04 23:12:04 Submitted job:
>> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
>> 2021/11/04 23:12:04 Job state: STOPPED
>> 2021/11/04 23:12:04 Job state: STARTING
>> 2021/11/04 23:12:04 Job state: RUNNING
>> 2021/11/04 23:12:17 Job state: DONE
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
>> 2021/11/04 23:12:17 unknown metric type
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>>  labels:{key:"PCOLLECTION"  value:"n1"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>  labels:{key:"PCOLLECTION"  value:"n3"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>  labels:{key:"PCOLLECTION"  value:"n2"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>>  labels:{key:"PCOLLECTION"  value:"n4"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>>  value:"n6"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
>> 2021/11/04 23:12:17 unknown metric type
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>>  value:"n7"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
>> 2021/11/04 23:12:17 unknown metric type
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>> urn:"beam:metric:sampled_byte_size:v1"
>>  type:"beam:metrics:distribution_int64:v1"
>>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>>  labels:{key:"PCOLLECTION"  value:"n11"}
>>
>> However misleading those failures are, the process exits successfully. I
>> have more to learn about where the output went.
>>
>> It's really neat to see this working.
>>
>> Would you be interested in PRs for these?
>> * Go examples to register all the types needed for other runners
>> * updating the Go Quick Start to use the Spark 3 runner so it plays
>> better with the embedded Spark cluster
>>
>> Jeff
>>
>>
>>
>>
>>
>>
>>
>>
>>

Re: Apache Beam Go SDK Quickstart bugs

Posted by Kenneth Knowles <ke...@apache.org>.
Awesome! Just going to add a few colleagues (who are subscribed anyhow) to
make sure this hits the top of their inbox.

+Robert Burke <re...@google.com> +Chamikara Jayalath
<ch...@google.com> +Kyle
Weaver <kc...@google.com>

Kenn

On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jr...@gmail.com> wrote:

> I'm interested to see the Go SDK work with the Spark runner. Based on the
> instructions at https://beam.apache.org/get-started/quickstart-go/, I run
> these commands and get the following failure:
>
> $ ./gradlew :runners:spark:2:job-server:runShadow
> in another window:
> $ cd sdks
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark --endpoint localhost:8099
> 2021/11/04 22:06:46 No environment config specified. Using default config:
> 'apache/beam_go_sdk:2.35.0.dev'
> 2021/11/04 22:06:46 Failed to execute job:      generating model pipeline
> failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
> 5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
> {5: string/string GLO}]}] [0xc000096cd0]}
>         caused by:
> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>         caused by:
> failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
> -> [Out: string -> {5: string/string GLO}]
>         caused by:
>         encoding userfn 5: ParDo [In(Main): string <- {4: string/string
> GLO}] -> [Out: string -> {5: string/string GLO}]
> bad userfn
>         caused by:
>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
> map[ProcessElement:0xc0004fcac0] map[]}
> receiver type *main.extractFn must be registered
> exit status 1
>
> I was able to register that type, like this:
>
> diff --git a/sdks/go/examples/wordcount/wordcount.go
> b/sdks/go/examples/wordcount/wordcount.go
> index 4d54db9a2d..6db99d6220 100644
> --- a/sdks/go/examples/wordcount/wordcount.go
> +++ b/sdks/go/examples/wordcount/wordcount.go
> @@ -60,6 +60,7 @@ import (
>         "flag"
>         "fmt"
>         "log"
> +       "reflect"
>         "regexp"
>         "strings"
>
> @@ -107,6 +108,7 @@ var (
>  // by calling beam.RegisterFunction in an init() call.
>  func init() {
>         beam.RegisterFunction(formatFn)
> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>  }
>
>  var (
>
>
> Then I encountered:
>
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
> class file major version 55
> 2021/11/04 23:07:26 Job state: FAILED
> 2021/11/04 23:07:26 Failed to execute job: job
> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
> failed
> exit status 1
>
>
> Switching to the Spark 3.0 job server changed things:
> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
> ...
> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
> foo --runner spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:12:04 Staged binary artifact with token:
> 2021/11/04 23:12:04 Submitted job:
> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
> 2021/11/04 23:12:04 Job state: STOPPED
> 2021/11/04 23:12:04 Job state: STARTING
> 2021/11/04 23:12:04 Job state: RUNNING
> 2021/11/04 23:12:17 Job state: DONE
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>  labels:{key:"PCOLLECTION"  value:"n3"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>  labels:{key:"PCOLLECTION"  value:"n2"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>  labels:{key:"PCOLLECTION"  value:"n4"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>  value:"n6"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>  value:"n7"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"
>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>  labels:{key:"PCOLLECTION"  value:"n11"}
>
> However misleading those failures are, the process exits successfully. I
> have more to learn about where the output went.
>
> It's really neat to see this working.
>
> Would you be interested in PRs for these?
> * Go examples to register all the types needed for other runners
> * updating the Go Quick Start to use the Spark 3 runner so it plays better
> with the embedded Spark cluster
>
> Jeff
>
>
>
>
>
>
>
>
>