You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/19 22:44:55 UTC

[GitHub] [beam] lostluck commented on a change in pull request #12632: [BEAM-9919] Refining xlang types and wrappers

lostluck commented on a change in pull request #12632:
URL: https://github.com/apache/beam/pull/12632#discussion_r473205897



##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -283,31 +283,51 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
 }
 
 // NewCrossLanguage inserts a Cross-langugae External transform.
-func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform) *MultiEdge {
+func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins []*Inbound, outs []*Outbound) (*MultiEdge, func(*Node, bool)) {
 	edge := g.NewEdge(s)
 	edge.Op = External
 	edge.External = ext
 
-	for _, n := range ext.Inputs() {
-		edge.Input = append(edge.Input, &Inbound{Kind: Main, From: n, Type: n.Type()})
+	windowingStrategy := inputWindow([]*Node{ins[0].From})
+	for _, o := range outs {
+		o.To.w = windowingStrategy
 	}
-	return edge
+
+	isBoundedUpdater := func(n *Node, bounded bool) {
+		n.bounded = bounded
+	}
+
+	edge.Input = ins
+	edge.Output = outs
+
+	return edge, isBoundedUpdater
+}
+
+func NewNamedInboundLinks(ins map[string]*Node) (map[string]int, []*Inbound) {
+	inputsMap := make(map[string]int)
+	var inboundLinks []*Inbound
+
+	for tag, node := range ins {
+		id := len(inboundLinks)
+		inputsMap[tag] = id
+		inboundLinks = append(inboundLinks, &Inbound{Kind: Main, From: node, Type: node.Type()})
+	}
+
+	return inputsMap, inboundLinks
 }
 
-// AddOutboundLinks adds Outbound links to existing MultiEdge
-func AddOutboundLinks(g *Graph, e *MultiEdge) {

Review comment:
       Just an educational note: As a rule it's very bad form to make breaking changes (like deleting entirely, changing the API surface) to exported functions, as they will break user builds that use them forcing them to make such changes. Breaking changes shouldn't be made lightly. Similarly, adding new user visible surfaces (like methods and types) shouldn't be made lightly either, as they represent a new maintenance burden.
   
   In this case it's OK since this function was only added to support XLang super recently and under the same experimental umbrella of use at own risk. Just letting you know.

##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -283,31 +283,51 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
 }
 
 // NewCrossLanguage inserts a Cross-langugae External transform.
-func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform) *MultiEdge {

Review comment:
       Note that none of the other graph functions require the Inbound and outbounds to be passed in. They're usually constructed inside these methods and attached to the node that way.

##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -283,31 +283,51 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
 }
 
 // NewCrossLanguage inserts a Cross-langugae External transform.
-func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform) *MultiEdge {
+func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins []*Inbound, outs []*Outbound) (*MultiEdge, func(*Node, bool)) {
 	edge := g.NewEdge(s)
 	edge.Op = External
 	edge.External = ext
 
-	for _, n := range ext.Inputs() {
-		edge.Input = append(edge.Input, &Inbound{Kind: Main, From: n, Type: n.Type()})
+	windowingStrategy := inputWindow([]*Node{ins[0].From})
+	for _, o := range outs {
+		o.To.w = windowingStrategy
 	}
-	return edge
+
+	isBoundedUpdater := func(n *Node, bounded bool) {
+		n.bounded = bounded
+	}
+
+	edge.Input = ins
+	edge.Output = outs
+
+	return edge, isBoundedUpdater
+}
+
+func NewNamedInboundLinks(ins map[string]*Node) (map[string]int, []*Inbound) {
+	inputsMap := make(map[string]int)
+	var inboundLinks []*Inbound
+
+	for tag, node := range ins {
+		id := len(inboundLinks)
+		inputsMap[tag] = id
+		inboundLinks = append(inboundLinks, &Inbound{Kind: Main, From: node, Type: node.Type()})
+	}
+
+	return inputsMap, inboundLinks
 }
 
-// AddOutboundLinks adds Outbound links to existing MultiEdge
-func AddOutboundLinks(g *Graph, e *MultiEdge) {
-	windowingStrategy := inputWindow([]*Node{e.Input[0].From})
-	outputTypes := e.External.OutputTypes()
-	boundedOutputs := e.External.Expanded().BoundedOutputs()
-	outputs := make(map[string]*Node)
+func NewNamedOutboundLinks(g *Graph, outs map[string]typex.FullType) (map[string]int, []*Outbound) {
+	outputsMap := make(map[string]int)
+	var outboundLinks []*Outbound
 
-	for tag, fullType := range outputTypes {
-		n := g.NewNode(fullType, windowingStrategy, boundedOutputs[tag])
-		outputs[tag] = n
-		e.Output = append(e.Output, &Outbound{To: n, Type: fullType})
+	for tag, fullType := range outs {

Review comment:
       Map iteration isn't deterministic, and will change from call to call.
   
   Here and in your other helper, I recommend extracting the keys to a []string first and then calling sort.Strings(keys) on it before iterating. This makes the iteration order determistic. This helps with testing, and debugging since the results will always be the same.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+func ExpandedComponents(exp *graph.ExpandedTransform) *pipepb.Components {

Review comment:
       I'd quibble over the necessity to type assert and then panic, but I do like the additional clarity that panic message will provide. No change necessary just a note.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/translate.go
##########
@@ -89,27 +89,28 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 	}
 }
 
+// TODO(pskevin): handle sourceInput and sinkOutput
 func VerifyNamedOutputs(ext *graph.ExternalTransform) {

Review comment:
       Just a note, that VerifyNamedOutputs and ResolveOutputIsBounded are excellent candidates for unit tests.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/translate.go
##########
@@ -32,32 +32,32 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 	for _, e := range edges {
 		if e.Op == graph.External {
 			id := fmt.Sprintf("e%v", e.ID())
-			exp := e.External.Expanded()
+			exp := e.External.Expanded
 
-			p.Requirements = append(p.Requirements, exp.Requirements()...)
+			p.Requirements = append(p.Requirements, exp.Requirements...)
 
 			// Adding components of the Expanded Transforms to the current Pipeline
-			for k, v := range exp.Components().GetTransforms() {
+			for k, v := range graphx.ExpandedComponents(exp).GetTransforms() {

Review comment:
       Consider instead of repeatedly calling the helper function, extract the components once to a `var comps *pipepb.Components`, and get the other fields that way. it's Totally ok for this package to import those protos.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org