You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/10/01 00:08:39 UTC
[beam] branch master updated: [BEAM-9918] Make TryCrossLanguage
match non Try API (#15633)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dc4a6a9 [BEAM-9918] Make TryCrossLanguage match non Try API (#15633)
dc4a6a9 is described below
commit dc4a6a92c0ca31268352fbe12d8399c69dc1caf9
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu Sep 30 17:07:33 2021 -0700
[BEAM-9918] Make TryCrossLanguage match non Try API (#15633)
---
CHANGES.md | 1 +
sdks/go/pkg/beam/xlang.go | 42 +++++++++++++++++++++++++++---------------
2 files changed, 28 insertions(+), 15 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1c132df..c282f07 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -72,6 +72,7 @@
* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* SQL Rows are no longer flattened ([BEAM-5505](https://issues.apache.org/jira/browse/BEAM-5505)).
+* [Go SDK] beam.TryCrossLanguage's signature now matches beam.CrossLanguage. Like other Try functions it returns an error instead of panicking. ([BEAM-9918](https://issues.apache.org/jira/browse/BEAM-9918)).
## Deprecations
diff --git a/sdks/go/pkg/beam/xlang.go b/sdks/go/pkg/beam/xlang.go
index b9c6ec7..2d36265 100644
--- a/sdks/go/pkg/beam/xlang.go
+++ b/sdks/go/pkg/beam/xlang.go
@@ -143,6 +143,23 @@ func CrossLanguage(
namedInputs map[string]PCollection,
namedOutputTypes map[string]FullType,
) map[string]PCollection {
+ namedOutputs, err := TryCrossLanguage(s, urn, payload, expansionAddr, namedInputs, namedOutputTypes)
+ if err != nil {
+ panic(errors.WithContextf(err, "tried cross-language for %v against %v and failed", urn, expansionAddr))
+ }
+ return namedOutputs
+}
+
+// TryCrossLanguage coordinates the core functions required to execute the cross-language transform.
+// See CrossLanguage for user documentation.
+func TryCrossLanguage(
+ s Scope,
+ urn string,
+ payload []byte,
+ expansionAddr string,
+ namedInputs map[string]PCollection,
+ namedOutputTypes map[string]FullType,
+) (map[string]PCollection, error) {
if !s.IsValid() {
panic(errors.New("invalid scope"))
}
@@ -150,41 +167,36 @@ func CrossLanguage(
inputsMap, inboundLinks := graph.NamedInboundLinks(mapPCollectionToNode(namedInputs))
outputsMap, outboundLinks := graph.NamedOutboundLinks(s.real, namedOutputTypes)
+ // Set the coder for outbound links for downstream validation.
+ for n, i := range outputsMap {
+ c := NewCoder(namedOutputTypes[n])
+ outboundLinks[i].To.Coder = c.coder
+ }
+
ext := graph.ExternalTransform{
Urn: urn,
Payload: payload,
ExpansionAddr: expansionAddr,
}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
- namedOutputs, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
- if err != nil {
- panic(errors.WithContextf(err, "tried cross-language and failed"))
- }
- return mapNodeToPCollection(namedOutputs)
-}
-
-// TryCrossLanguage coordinates the core functions required to execute the cross-language transform.
-// This is mainly intended for internal use. For the general-use entry point, see
-// beam.CrossLanguage.
-func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inbound, outs []*graph.Outbound) (map[string]*graph.Node, error) {
// Adding an edge in the graph corresponding to the ExternalTransform
- edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, ext, ins, outs)
+ edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, &ext, inboundLinks, outboundLinks)
// Once the appropriate input and output nodes are added to the edge, a
// unique namespace can be requested.
ext.Namespace = graph.NewNamespace()
// Expand the transform into ext.Expanded.
- if err := xlangx.Expand(edge, ext); err != nil {
+ if err := xlangx.Expand(edge, &ext); err != nil {
return nil, errors.WithContext(err, "expanding external transform")
}
// Ensures the expected named outputs are present
- graphx.VerifyNamedOutputs(ext)
+ graphx.VerifyNamedOutputs(&ext)
// Using the expanded outputs, the graph's counterpart outputs are updated with bounded values
graphx.ResolveOutputIsBounded(edge, isBoundedUpdater)
- return graphx.ExternalOutputs(edge), nil
+ return mapNodeToPCollection(graphx.ExternalOutputs(edge)), nil
}
// Wrapper functions to handle beam <-> graph boundaries