You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/01 21:03:20 UTC

[GitHub] [beam] ronoaldo commented on issue #22931: [Bug]: Calling Python transforms from Go pipeline results in an error

ronoaldo commented on issue #22931:
URL: https://github.com/apache/beam/issues/22931#issuecomment-1234779435

   I'm indeed curious about what I am missing from docs regarding calling xlang transforms.
   
   Following the chapter 13 of Beam Programming Model guide, I wrote three Pipelines, one on each language. The Python and Java one expose their SplitWords implementation to an expansion service.
   
   I can run each pipeline, both locally and on Google Cloud Dataflow remote worker, to make sure they all are properly working by their own. Each main has some arguments to allow for a cross language call configuration.
   
   I have then tested several combinations of xlang calls and only one worked:
   
   * Calling the Python implementation from Go fails
   * Calling the Java implementation from Go also fails, but with a different error.
   * Calling the Java implementation from Python works! I have not tested yet calling Python implementation from Java.
   
   So, to summarize, my experiments yielded these results:
   
   |          | Java | Python   | Go |
   | ------------------| -------| ------------| -----| 
   | Java |   -      |   Unstested |  Unsupported |
   | Python| Works | -  | Unsupported |
   | Go    |   Fail  |    Fail   |  - |
   
   This is my Go code: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/go/pipeline.go
   This is my Python code: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/pipeline.py
   This is my Java Pipeilne: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/java/src/main/java/com/ronoaldo/WordCountPipeline.java and my Java exported PTransform: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/java/src/main/java/com/ronoaldo/SplitWordsFromJava.java
   
   Am my missing something?
   
   This is the Go output after I launch the Java expansion server:
   
   ```
   2022/09/01 18:02:49 Using external transform SplitWordsFromJava at localhost:12345
   2022/09/01 18:02:50 Executing pipeline with the direct runner.
   2022/09/01 18:02:50 Pipeline:
   2022/09/01 18:02:50 Nodes: {1: []uint8/bytes GLO}
   {2: string/string GLO}
   {3: string/string GLO}
   {4: KV<string,int64>/KV<string,varint> GLO}
   {5: string/string GLO}
   {6: string/string GLO}
   {7: KV<string,int>/KV<string,int[varintz]> GLO}
   {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}
   {9: KV<string,int>/KV<string,int[varintz]> GLO}
   {10: main.CountedWord/R[main.CountedWord] GLO}
   {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}
   {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}
   {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}
   {14: []main.CountedWord/[]main.CountedWord[json] GLO}
   {15: string/string GLO}
   {16: KV<int,string>/KV<int[varintz],string> GLO}
   {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}
   Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
   2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: string/string GLO}]
   3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> {3: string/string GLO}]
   4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: KV<string,int64> -> {4: KV<string,int64>/KV<string,varint> GLO}]
   5: ParDo [In(Main): KV<string,int64> <- {4: KV<string,int64>/KV<string,varint> GLO}] -> [Out: string -> {5: string/string GLO}]
   6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
   7: ParDo [In(Main): T <- {6: string/string GLO}] -> [Out: KV<T,int> -> {7: KV<string,int>/KV<string,int[varintz]> GLO}]
   8: CoGBK [In(Main): KV<string,int> <- {7: KV<string,int>/KV<string,int[varintz]> GLO}] -> [Out: CoGBK<string,int> -> {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}]
   9: Combine [In(Main): int <- {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}] -> [Out: KV<string,int> -> {9: KV<string,int>/KV<string,int[varintz]> GLO}]
   10: ParDo [In(Main): KV<string,int> <- {9: KV<string,int>/KV<string,int[varintz]> GLO}] -> [Out: main.CountedWord -> {10: main.CountedWord/R[main.CountedWord] GLO}]
   11: ParDo [In(Main): T <- {10: main.CountedWord/R[main.CountedWord] GLO}] -> [Out: KV<int,T> -> {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}]
   12: CoGBK [In(Main): KV<int,main.CountedWord> <- {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}] -> [Out: CoGBK<int,main.CountedWord> -> {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}]
   13: Combine [In(Main): T <- {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}] -> [Out: KV<int,[]T> -> {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}]
   14: ParDo [In(Main): KV<X,Y> <- {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}] -> [Out: Y -> {14: []main.CountedWord/[]main.CountedWord[json] GLO}]
   15: ParDo [In(Main): []main.CountedWord <- {14: []main.CountedWord/[]main.CountedWord[json] GLO}] -> [Out: string -> {15: string/string GLO}]
   16: ParDo [In(Main): T <- {15: string/string GLO}] -> [Out: KV<int,T> -> {16: KV<int,string>/KV<int[varintz],string> GLO}]
   17: CoGBK [In(Main): KV<int,string> <- {16: KV<int,string>/KV<int[varintz],string> GLO}] -> [Out: CoGBK<int,string> -> {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}]
   18: ParDo [In(Main): CoGBK<int,string> <- {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] -> []
   2022/09/01 18:02:50 Failed to execute job: translation failed
   	caused by:
   unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
   panic: Failed to execute job: translation failed
   	caused by:
   unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
   
   goroutine 1 [running]:
   github.com/apache/beam/sdks/v2/go/pkg/beam/log.Fatalf({0x114d0e0, 0xc0000420e8}, {0xfeb8b2?, 0x18?}, {0xc0006bff60?, 0x0?, 0x464f1b?})
   	/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.41.0/go/pkg/beam/log/log.go:153 +0xa5
   main.main()
   	/home/ronoaldo/workspace/micro-beam/05_xlang/go/pipeline.go:144 +0x2a8
   exit status 2
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org