You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/22 00:17:25 UTC

[GitHub] [beam] lostluck commented on a change in pull request #12667: [BEAM-9918] Adding tests and documentation to xlang components

lostluck commented on a change in pull request #12667:
URL: https://github.com/apache/beam/pull/12667#discussion_r475012244



##########
File path: sdks/go/examples/xlang/group_by/group_by.go
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// group_by exemplifies using a cross-language group by key transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"fmt"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+
+	"context"
+	"flag"
+	"log"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"

Review comment:
       Please consolidate these imports into two groups, standard library followed by the github imports.
   
   Separating out and commenting on the _ imports however is a good practice.
   
   This comnment applies to all these example files.
   

##########
File path: sdks/go/examples/xlang/multi_input_output/multi.go
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// multi exemplifies using a cross-language transform with multiple inputs and
+// outputs from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+
+	"context"
+	"flag"
+	"log"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+func init() {

Review comment:
       There's no need for an empty init block, please remove.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/namespace_test.go
##########
@@ -0,0 +1,584 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"strings"
+	"testing"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/protobuf/testing/protocmp"
+)
+
+func assertPanic(t *testing.T, f func(), err string) {

Review comment:
       Please convert this to the expect panic format used in the other tests.

##########
File path: sdks/go/pkg/beam/xlang.go
##########
@@ -21,91 +21,154 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/xlangx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 )
 
-// This is an experimetnal API and subject to change
-func CrossLanguage(s Scope, urn string, payload []byte, expansionAddr string, inputs map[string]PCollection, outputTypes map[string]FullType) map[string]PCollection {
+// xlang exposes an API to execute cross-language transforms within the Go SDK.
+// It is experimental and likely to change. It exposes convenient wrappers
+// around the core functions to pass in any combination of named/unnamed
+// inputs/outputs.
+
+// CrossLanguage executes a cross-language transform that uses named inputs and
+// returns named outputs.
+func CrossLanguage(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	namedInputs map[string]PCollection,
+	namedOutputTypes map[string]FullType,
+) map[string]PCollection {
 	if !s.IsValid() {
 		panic(errors.New("invalid scope"))
 	}
 
-	namedInputNodes := mapPCollectionToNode(inputs)
-
-	inputsMap, inboundLinks := graph.NewNamedInboundLinks(namedInputNodes)
-	outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real, outputTypes)
+	inputsMap, inboundLinks := graph.NewNamedInboundLinks(mapPCollectionToNode(namedInputs))
+	outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real, namedOutputTypes)
 
 	ext := graph.ExternalTransform{
 		Urn:           urn,
 		Payload:       payload,
 		ExpansionAddr: expansionAddr,
 	}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
 
-	outputNodes, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
+	namedOutputs, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
 	if err != nil {
 		panic(errors.WithContextf(err, "tried cross-language and failed"))
 	}
-	return mapNodeToPCollection(outputNodes)
+	return mapNodeToPCollection(namedOutputs)
 }
 
-/*
-func CrossLanguageWithSink(s Scope, urn string, payload []byte, expansionAddr string, inputs map[string]PCollection, outputType FullType) PCollection {
-	inputNodes := mapPCollectionToNode(inputs)
+// CrossLanguageWithSingleInputOutput executes a cross-language transform that
+// uses a single unnamed input and returns a single unnamed output.
+func CrossLanguageWithSingleInputOutput(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	input PCollection,
+	outputType FullType,
+) PCollection {
+	if !s.IsValid() {
+		panic(errors.New("invalid scope"))
+	}
+
+	// Adding dummy SourceInputTag to process it as a named input
+	namedInput := mapPCollectionToNode(map[string]PCollection{graph.SourceInputTag: input})
+	// Adding dummy SinkOutputTag to process it as a named output
+	namedOutputType := map[string]typex.FullType{graph.SinkOutputTag: outputType}
 
-	inputsMap, inboundLinks := graph.NewNamedInboundLinks(inputNodes)
-	outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real, map[string]typex.FullType{graph.SinkOutputTag: outputType})
+	inputsMap, inboundLinks := graph.NewNamedInboundLinks(namedInput)
+	outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real, namedOutputType)
 
 	ext := graph.ExternalTransform{
 		Urn:           urn,
 		Payload:       payload,
 		ExpansionAddr: expansionAddr,
-	}.WithNamedInputs(inputNodes).WithNamedOutputs(outputTypes)
+	}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
 
-	outputNodes, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
+	namedOutput, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
 	if err != nil {
 		panic(errors.WithContextf(err, "tried cross-language and failed"))
 	}
-	namedOutputNode := mapNodeToPCollection(outputNodes)
+	return nodeToPCollection(namedOutput[graph.SinkOutputTag])
+}
 
-	outputNode, exists := namedOutputNode[graph.SinkOutputTag]
-	if !exists {
-		panic("a")
+// CrossLanguageWithSink executes a cross-language transform that uses named
+// inputs and returns a single unnamed output.

Review comment:
       The naming here is a bit confusing. As a sink, the expectation is 1 input and 0 outputs, rather than 1, with many outputs.  Similarly for the source, we would expect 0 inputs, but get 1 output. rather than many inputs, with 1 output.
   
   It's certainly out of scope given the time remaining, given how many place would need to change, but we will probably consider changing these to helper functions which generate the input or output maps with the magic tags, and users always call the top level CrossLanguage call instead.

##########
File path: sdks/go/examples/xlang/group_by/group_by.go
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// group_by exemplifies using a cross-language group by key transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"fmt"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+
+	"context"
+	"flag"
+	"log"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c []int) string {
+	sort.Ints(c)
+	return fmt.Sprintf("%v:%v", w, c)
+}
+
+// KV used to represent KV PCollection values
+type KV struct {
+	X string
+	Y int64
+}
+
+func getKV(kv KV, emit func(string, int64)) {
+	emit(kv.X, kv.Y)
+}
+
+func collectValues(key string, iter func(*int64) bool) (string, []int) {
+	var count int64
+	var values []int
+	for iter(&count) {
+		values = append(values, int(count))
+	}
+	return key, values
+}
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
+	beam.RegisterFunction(formatFn)
+	beam.RegisterFunction(getKV)
+	beam.RegisterFunction(collectValues)
+

Review comment:
       There's no need to have an extra blank line at the end of init blocks.

##########
File path: sdks/go/pkg/beam/core/graph/xlang.go
##########
@@ -71,8 +93,10 @@ func (ext ExternalTransform) WithNamedOutputs(outputsMap map[string]int) Externa
 	return ext
 }
 
-// TODO(pskevin): Credit one of the best stackoverflow answers @ https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-go
-
+// NewNamespaceGenerator returns a functions that generates a random string of n alphabets
+//
+// Adopted from icza's stackoverflow answer @
+// https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-go

Review comment:
       I'd consider not including the credit in the public documentation to the function. It doesn't help a caller use the DoFn, it's describing an implementation detail.

##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -303,11 +304,22 @@ func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins []*Inbound
 	return edge, isBoundedUpdater
 }
 
+// NewNamedInboundLinks returns an array of new Inbound links and a map (tag ->

Review comment:
       Just a style comment: In Go New has the implication that it's creating and initializing a single new pointer object. In this case, there's no need for the New prefix, as it doesn't clarify that. While this is creating new Inbound instances and a map, it's doing much more than a single value creation, so I would drop the prefix.
   
   Similar for NewNamedOutboundLinks below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org