You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sergiusz Rokosz <se...@reflek.io> on 2022/08/01 15:03:19 UTC

Java prefix external pipeline with go sdk

Hello,
I was trying to recreate java prefix pipeline from https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/ <https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/> but with go sdk. My pipeline code is: 

package main

import (
	"context"
	"flag"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/examples/xlang <http://github.com/apache/beam/sdks/v2/go/examples/xlang>"
	"github.com/apache/beam/sdks/v2/go/pkg/beam <http://github.com/apache/beam/sdks/v2/go/pkg/beam>"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex <http://github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex>"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx <http://github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx>"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio <http://github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio>"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log <http://github.com/apache/beam/sdks/v2/go/pkg/beam/log>"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx <http://github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx>"
)

const readURN = "beam:transform:org.apache.beam:javaprefix:v1"

var (
	input         = flag.String("input", "lorem", "Pubsub input topic.")
	output        = flag.String("output", "output-go", "Pubsub input topic.")
	expansionAddr = flag.String("expansion_addr", "localhost:3333", "Pubsub input topic.")
)

func main() {
	flag.Parse()
	beam.Init()
	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()

	lines := textio.Read(s, *input)
	withJavaPrefix := xlang.Prefix(s, "java:", *expansionAddr, lines)
	withGoPrefix := beam.ParDo(s, func(s string) string { return fmt.Sprintf("go: %s", s) }, withJavaPrefix)
	textio.Write(s, *output, withGoPrefix)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}
func Prefix(s beam.Scope, prefix string, addr string, col beam.PCollection) beam.PCollection {
	s = s.Scope("XLangTest.Prefix")

	pl := beam.CrossLanguagePayload(prefixPayload{Data: prefix})
	outT := beam.UnnamedOutput(typex.New(reflectx.String))
	outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:prefix", pl, addr, beam.UnnamedInput(col), outT)
	return outs[beam.UnnamedOutputTag()]
}

type prefixPayload struct {
	Data string
}


I’ve downloaded the 2.40 version of expansion-service and started it with 'java -jar beam-examples-multi-language-2.40.0.jar 3333 --javaClassLookupAllowlistFile=‘*’’.

When i run the pipeline I get error: unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
I would like to ask what is the problem. I dont have a clue what might be wrong.
Cheers,
Sergiusz

Re: Java prefix external pipeline with go sdk

Posted by Jack McCluskey via user <us...@beam.apache.org>.
Okay so I have gone back and confirmed that the translation failed error
also originates from the direct runner (
https://github.com/apache/beam/blob/8da6363b6fef8da5e73976be2b1277a776c05239/sdks/go/pkg/beam/runners/direct/direct.go#L337)
so you should try to execute on your runner of choice. I'm going to go in
and improve the error message so this issue is more clear moving forward,
and there's a tracking issue filed at
https://github.com/apache/beam/issues/22560. Let me know if you have any
more problems, I'm happy to help!

Thanks,

Jack McCluskey

On Tue, Aug 2, 2022 at 6:57 AM Sergiusz Rokosz <se...@reflek.io>
wrote:

> Hello,
> Thank you very much for the reply. I will try running the second example
> with dataflow runner. Here is the full output from the first example:
>
> 2022/08/02 12:50:19 Executing pipeline with the direct runner.
> 2022/08/02 12:50:19 Pipeline:
> 2022/08/02 12:50:19 Nodes: {1: []uint8/bytes GLO}
> {2: string/string GLO}
> {3: string/string GLO}
> {4: KV<string,int64>/KV<string,varint> GLO}
> {5: string/string GLO}
> {6: string/string GLO}
> {7: string/string GLO}
> {8: KV<int,string>/KV<int[varintz],string> GLO}
> {9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}
> Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
> 2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2:
> string/string GLO}]
> 3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string ->
> {3: string/string GLO}]
> 4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out:
> KV<string,int64> -> {4: KV<string,int64>/KV<string,varint> GLO}]
> 5: ParDo [In(Main): KV<string,int64> <- {4:
> KV<string,int64>/KV<string,varint> GLO}] -> [Out: string -> {5:
> string/string GLO}]
> 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string
> -> {6: string/string GLO}]
> 7: ParDo [In(Main): string <- {6: string/string GLO}] -> [Out: string ->
> {7: string/string GLO}]
> 8: ParDo [In(Main): T <- {7: string/string GLO}] -> [Out: KV<int,T> -> {8:
> KV<int,string>/KV<int[varintz],string> GLO}]
> 9: CoGBK [In(Main): KV<int,string> <- {8:
> KV<int,string>/KV<int[varintz],string> GLO}] -> [Out: CoGBK<int,string> ->
> {9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}]
> 10: ParDo [In(Main): CoGBK<int,string> <- {9:
> CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] -> []
> 2022/08/02 12:50:19 Failed to execute job: translation failed
>         caused by:
> unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}]
> -> [Out: string -> {6: string/string GLO}]
> exit status 1
>
>
>  And also I’m sorry, I made a mistake when pasting the first example. I
> used a custom Prefix transform which uses a different urn. I will paste it
> once again so there is no confusion.
>
> package main
>
> import (
>         "context"
>         "flag"
>         "fmt"
>
>         "github.com/apache/beam/sdks/v2/go/pkg/beam"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
> )
>
> const readURN = "beam:transform:org.apache.beam:javaprefix:v1"
>
> var (
>         input         = flag.String("input", "lorem", "Pubsub input
> topic.")
>         output        = flag.String("output", "output-go", "Pubsub input
> topic.")
>         expansionAddr = flag.String("expansion_addr", "localhost:3333",
> "Pubsub input topic.")
> )
>
> func main() {
>         flag.Parse()
>         beam.Init()
>         ctx := context.Background()
>
>         p := beam.NewPipeline()
>         s := p.Root()
>
>         lines := textio.Read(s, *input)
>         withJavaPrefix := Prefix(s, "java:", *expansionAddr, lines)
>         withGoPrefix := beam.ParDo(s, func(s string) string { return
> fmt.Sprintf("go: %s", s) }, withJavaPrefix)
>         textio.Write(s, *output, withGoPrefix)
>
>         if err := beamx.Run(context.Background(), p); err != nil {
>                 log.Exitf(ctx, "Failed to execute job: %v", err)
>         }
> }
> func Prefix(s beam.Scope, prefix string, addr string, col
> beam.PCollection) beam.PCollection {
>         s = s.Scope("XLangTest.Prefix")
>
>         pl := beam.CrossLanguagePayload(prefixPayload{Prefix: prefix})
>         outT := beam.UnnamedOutput(typex.New(reflectx.String))
>         outs := beam.CrossLanguage(s, readURN, pl, addr,
> beam.UnnamedInput(col), outT)
>         return outs[beam.UnnamedOutputTag()]
> }
>
> type prefixPayload struct {
>         Prefix string
> }
>
> Cheers,
> Sergiusz

Re: Java prefix external pipeline with go sdk

Posted by Sergiusz Rokosz <se...@reflek.io>.
Hello,
Thank you very much for the reply. I will try running the second example with dataflow runner. Here is the full output from the first example:

2022/08/02 12:50:19 Executing pipeline with the direct runner.
2022/08/02 12:50:19 Pipeline:
2022/08/02 12:50:19 Nodes: {1: []uint8/bytes GLO}
{2: string/string GLO}
{3: string/string GLO}
{4: KV<string,int64>/KV<string,varint> GLO}
{5: string/string GLO}
{6: string/string GLO}
{7: string/string GLO}
{8: KV<int,string>/KV<int[varintz],string> GLO}
{9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: string/string GLO}]
3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> {3: string/string GLO}]
4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: KV<string,int64> -> {4: KV<string,int64>/KV<string,varint> GLO}]
5: ParDo [In(Main): KV<string,int64> <- {4: KV<string,int64>/KV<string,varint> GLO}] -> [Out: string -> {5: string/string GLO}]
6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
7: ParDo [In(Main): string <- {6: string/string GLO}] -> [Out: string -> {7: string/string GLO}]
8: ParDo [In(Main): T <- {7: string/string GLO}] -> [Out: KV<int,T> -> {8: KV<int,string>/KV<int[varintz],string> GLO}]
9: CoGBK [In(Main): KV<int,string> <- {8: KV<int,string>/KV<int[varintz],string> GLO}] -> [Out: CoGBK<int,string> -> {9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}]
10: ParDo [In(Main): CoGBK<int,string> <- {9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] -> []
2022/08/02 12:50:19 Failed to execute job: translation failed
        caused by:
unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
exit status 1


 And also I’m sorry, I made a mistake when pasting the first example. I used a custom Prefix transform which uses a different urn. I will paste it once again so there is no confusion.

package main

import (
	"context"
	"flag"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

const readURN = "beam:transform:org.apache.beam:javaprefix:v1"

var (
	input         = flag.String("input", "lorem", "Pubsub input topic.")
	output        = flag.String("output", "output-go", "Pubsub input topic.")
	expansionAddr = flag.String("expansion_addr", "localhost:3333", "Pubsub input topic.")
)

func main() {
	flag.Parse()
	beam.Init()
	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()

	lines := textio.Read(s, *input)
	withJavaPrefix := Prefix(s, "java:", *expansionAddr, lines)
	withGoPrefix := beam.ParDo(s, func(s string) string { return fmt.Sprintf("go: %s", s) }, withJavaPrefix)
	textio.Write(s, *output, withGoPrefix)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}
func Prefix(s beam.Scope, prefix string, addr string, col beam.PCollection) beam.PCollection {
	s = s.Scope("XLangTest.Prefix")

	pl := beam.CrossLanguagePayload(prefixPayload{Prefix: prefix})
	outT := beam.UnnamedOutput(typex.New(reflectx.String))
	outs := beam.CrossLanguage(s, readURN, pl, addr, beam.UnnamedInput(col), outT)
	return outs[beam.UnnamedOutputTag()]
}

type prefixPayload struct {
	Prefix string
}

Cheers,
Sergiusz

Re: Java prefix external pipeline with go sdk

Posted by Jack McCluskey via user <us...@beam.apache.org>.
Hey Sergiusz,

I'm looking into the cross-language problems you're running into. I can't
say with 100% certainty for the first issue, but the problem with the
second is that you're running on the Go direct runner, which does not
support cross-language transforms. You'll need to start up a more fully
featured runner and submit your job to it instead. The first pipeline looks
like a graph construction error, which would be independent of the runner.
Can I get a full error message output from that pipeline?

Thanks,

Jack McCluskey

On Mon, Aug 1, 2022 at 11:18 AM Sergiusz Rokosz <se...@reflek.io>
wrote:

> Dont know if this is connected but when executing a pipeline with cross
> language source I am getting an error too. For example
>
> package main
>
> import (
>         "context"
>         "flag"
>         "reflect"
>
>         "github.com/apache/beam/sdks/v2/go/pkg/beam"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
>         "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
> )
>
> var (
>         output = flag.String("output", "output-go", "Pubsub input topic.")
> )
>
> func main() {
>         flag.Parse()
>         beam.Init()
>         ctx := context.Background()
>
>         p := beam.NewPipeline()
>         s := p.Root()
>         table := "bigquery-public-data.usa_names.usa_1910_2013"
>         outType := reflect.TypeOf((*string)(nil)).Elem()
>         data := bigqueryio.Read(s, outType, bigqueryio.FromTable(table))
>         textio.Write(s, *output, data)
>
>         if err := beamx.Run(context.Background(), p); err != nil {
>                 log.Exitf(ctx, "Failed to execute job: %v", err)
>         }
> }
>
> and error is
>
> 2022/08/01 17:15:50 Failed to execute job: translation failed
>         caused by:
> no root units
> exit status 1

Re: Java prefix external pipeline with go sdk

Posted by Sergiusz Rokosz <se...@reflek.io>.
Dont know if this is connected but when executing a pipeline with cross language source I am getting an error too. For example

package main

import (
	"context"
	"flag"
	"reflect"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

var (
	output = flag.String("output", "output-go", "Pubsub input topic.")
)

func main() {
	flag.Parse()
	beam.Init()
	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()
	table := "bigquery-public-data.usa_names.usa_1910_2013"
	outType := reflect.TypeOf((*string)(nil)).Elem()
	data := bigqueryio.Read(s, outType, bigqueryio.FromTable(table))
	textio.Write(s, *output, data)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}

and error is

2022/08/01 17:15:50 Failed to execute job: translation failed
        caused by:
no root units
exit status 1