You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/10/01 00:08:39 UTC

[beam] branch master updated: [BEAM-9918] Make TryCrossLanguage match non Try API (#15633)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dc4a6a9  [BEAM-9918] Make TryCrossLanguage match non Try API (#15633)
dc4a6a9 is described below

commit dc4a6a92c0ca31268352fbe12d8399c69dc1caf9
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu Sep 30 17:07:33 2021 -0700

    [BEAM-9918] Make TryCrossLanguage match non Try API (#15633)
---
 CHANGES.md                |  1 +
 sdks/go/pkg/beam/xlang.go | 42 +++++++++++++++++++++++++++---------------
 2 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1c132df..c282f07 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -72,6 +72,7 @@
 
 * X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * SQL Rows are no longer flattened ([BEAM-5505](https://issues.apache.org/jira/browse/BEAM-5505)).
+* [Go SDK] beam.TryCrossLanguage's signature now matches beam.CrossLanguage. Like other Try functions it returns an error instead of panicking. ([BEAM-9918](https://issues.apache.org/jira/browse/BEAM-9918)).
 
 ## Deprecations
 
diff --git a/sdks/go/pkg/beam/xlang.go b/sdks/go/pkg/beam/xlang.go
index b9c6ec7..2d36265 100644
--- a/sdks/go/pkg/beam/xlang.go
+++ b/sdks/go/pkg/beam/xlang.go
@@ -143,6 +143,23 @@ func CrossLanguage(
 	namedInputs map[string]PCollection,
 	namedOutputTypes map[string]FullType,
 ) map[string]PCollection {
+	namedOutputs, err := TryCrossLanguage(s, urn, payload, expansionAddr, namedInputs, namedOutputTypes)
+	if err != nil {
+		panic(errors.WithContextf(err, "tried cross-language for %v against %v and failed", urn, expansionAddr))
+	}
+	return namedOutputs
+}
+
+// TryCrossLanguage coordinates the core functions required to execute the cross-language transform.
+// See CrossLanguage for user documentation.
+func TryCrossLanguage(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	namedInputs map[string]PCollection,
+	namedOutputTypes map[string]FullType,
+) (map[string]PCollection, error) {
 	if !s.IsValid() {
 		panic(errors.New("invalid scope"))
 	}
@@ -150,41 +167,36 @@ func CrossLanguage(
 	inputsMap, inboundLinks := graph.NamedInboundLinks(mapPCollectionToNode(namedInputs))
 	outputsMap, outboundLinks := graph.NamedOutboundLinks(s.real, namedOutputTypes)
 
+	// Set the coder for outbound links for downstream validation.
+	for n, i := range outputsMap {
+		c := NewCoder(namedOutputTypes[n])
+		outboundLinks[i].To.Coder = c.coder
+	}
+
 	ext := graph.ExternalTransform{
 		Urn:           urn,
 		Payload:       payload,
 		ExpansionAddr: expansionAddr,
 	}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
 
-	namedOutputs, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
-	if err != nil {
-		panic(errors.WithContextf(err, "tried cross-language and failed"))
-	}
-	return mapNodeToPCollection(namedOutputs)
-}
-
-// TryCrossLanguage coordinates the core functions required to execute the cross-language transform.
-// This is mainly intended for internal use. For the general-use entry point, see
-// beam.CrossLanguage.
-func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inbound, outs []*graph.Outbound) (map[string]*graph.Node, error) {
 	// Adding an edge in the graph corresponding to the ExternalTransform
-	edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, ext, ins, outs)
+	edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, &ext, inboundLinks, outboundLinks)
 
 	// Once the appropriate input and output nodes are added to the edge, a
 	// unique namespace can be requested.
 	ext.Namespace = graph.NewNamespace()
 
 	// Expand the transform into ext.Expanded.
-	if err := xlangx.Expand(edge, ext); err != nil {
+	if err := xlangx.Expand(edge, &ext); err != nil {
 		return nil, errors.WithContext(err, "expanding external transform")
 	}
 
 	// Ensures the expected named outputs are present
-	graphx.VerifyNamedOutputs(ext)
+	graphx.VerifyNamedOutputs(&ext)
 	// Using the expanded outputs, the graph's counterpart outputs are updated with bounded values
 	graphx.ResolveOutputIsBounded(edge, isBoundedUpdater)
 
-	return graphx.ExternalOutputs(edge), nil
+	return mapNodeToPCollection(graphx.ExternalOutputs(edge)), nil
 }
 
 // Wrapper functions to handle beam <-> graph boundaries