You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/01 03:09:05 UTC

[GitHub] [beam] pskevin opened a new pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

pskevin opened a new pull request #12445:
URL: https://github.com/apache/beam/pull/12445


   It is now possible to execute an External (a.k.a, Cross-language) Transform within pipelines authored using the Go SDK. 
   It also includes a Word count example run in LOOPBACK mode using the test python expansion service and executed by Python's portable runner.
   
   ---------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465479332



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph

Review comment:
       Thanks for clarifying in detail. Reflected the changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r471794239



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+	/*
+		Build the ExpansionRequest
+	*/
+	// Obtaining the components and transform proto representing this transform
+	pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+	if err != nil {
+		panic(err)
+	}
+
+	// Adding fake impulses to each input as required for correct expansion
+	// TODO(pskevin): Remove these fake impulses from final Pipeline since multiple producers of the same PCollections is logically wrong
+	transforms := pipeline.Components.Transforms
+	rootTransformID := pipeline.RootTransformIds[0]
+	for tag, id := range transforms[rootTransformID].Inputs {
+		key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+		output := map[string]string{"out": id}
+		impulse := &pipepb.PTransform{
+			UniqueName: key,
+			Spec: &pipepb.FunctionSpec{
+				Urn: graphx.URNImpulse,
+			},
+			Outputs: output,
+		}
+
+		transforms[key] = impulse
+	}
+
+	// Assembling ExpansionRequest proto
+	req := &jobpb.ExpansionRequest{
+		Components: pipeline.Components,
+		Transform:  transforms[rootTransformID],
+		Namespace:  s.String(),
+	}
+
+	/*
+		Querying Expansion Service
+	*/
+	// Setting grpc client
+	conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+	if err != nil {
+		panic(err)
+	}
+	defer conn.Close()
+	client := jobpb.NewExpansionServiceClient(conn)
+
+	// Handling ExpansionResponse
+	res, err := client.Expand(context.Background(), req)
+	if err != nil {
+		panic(err)
+	}
+	e.Components = res.GetComponents()

Review comment:
       That's a great suggestion. I've updated it to be in line with it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r471792581



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to count words
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",

Review comment:
       Thanks for that suggestion @youngoli! I've updated it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r466552912



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to count words
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",

Review comment:
       Alas, it's got to be a parameter. Users may request transforms from multiple independent expansion servers (eg. a Java one, a python one, a dataflow one...)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465361018



##########
File path: sdks/go/examples/xlang/wordcount/input
##########
@@ -0,0 +1,5 @@
+Lorem ipsum dolor sit amet, consectetur adipiscing elit. Mauris id tellus vehicula, rutrum turpis quis, suscipit est. Quisque vehicula nec ex a interdum. Phasellus vulputate nunc sit amet nisl dapibus tincidunt ut ullamcorper nisi. Mauris gravida porta leo vel congue. Duis sit amet arcu eu nisl pharetra interdum a eget enim. Nulla facilisis massa ut egestas interdum. Nunc elit dui, hendrerit at pharetra a, pellentesque non turpis. Integer auctor vulputate congue. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Ut sagittis convallis lorem non semper. Ut ultrices elit a enim pulvinar fermentum.

Review comment:
       Makes sense! Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465361243



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465879720



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,144 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {

Review comment:
       Per the other thread, please move these to parameters on CrossLanguage and TryCrossLanguage instead. Do not try to force in compatibility with the legacy External, it's OK for them to have two separate calls and paths.
   By having them as a struct it's not clear what is required and what is not, and the compiler won't help the user by failing at compile time. 
   
   An aside: The other issue here is you've mixed up user side parameters with internal implementation details, and made them part of the API surface. APIs are easiest to use when the user knows how to fill everything and what is required or not. The components and Expanded transform and requirements fields are not something that users would be filling in for example. Types are cheap. Make a new type instead of trying to reuse something that almost fits.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set

Review comment:
       Note, by this comment, the intent was for you to remove this dead code, as it's unnecessary.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,144 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string

Review comment:
       Move these to a graph.CrossLanguage struct, but have their proto types be interface{} instead, with a comment about what the types should be. Given those fields are only used by beam framework internals, there's little risk in using type assertions for them in the right places, such as the graphx package.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+	/*
+		Build the ExpansionRequest
+	*/
+	// Obtaining the components and transform proto representing this transform
+	pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+	if err != nil {
+		panic(err)
+	}
+
+	// Adding fake impulses to each input as required for correct expansion
+	// TODO(pskevin): Remove these fake impulses from final Pipeline since multiple producers of the same PCollections is logically wrong
+	transforms := pipeline.Components.Transforms
+	rootTransformID := pipeline.RootTransformIds[0]
+	for tag, id := range transforms[rootTransformID].Inputs {
+		key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+		output := map[string]string{"out": id}
+		impulse := &pipepb.PTransform{
+			UniqueName: key,
+			Spec: &pipepb.FunctionSpec{
+				Urn: graphx.URNImpulse,
+			},
+			Outputs: output,
+		}
+
+		transforms[key] = impulse
+	}
+
+	// Assembling ExpansionRequest proto
+	req := &jobpb.ExpansionRequest{
+		Components: pipeline.Components,
+		Transform:  transforms[rootTransformID],
+		Namespace:  s.String(),
+	}
+
+	/*
+		Querying Expansion Service
+	*/
+	// Setting grpc client
+	conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+	if err != nil {
+		panic(err)
+	}
+	defer conn.Close()
+	client := jobpb.NewExpansionServiceClient(conn)
+
+	// Handling ExpansionResponse
+	res, err := client.Expand(context.Background(), req)
+	if err != nil {
+		panic(err)
+	}
+	e.Components = res.GetComponents()

Review comment:
       I don't know why it didn't occur to me before, but if the deserialized proto object needs to be "stored" on an object in the graph package, that field can just be an interface{} and be commented that it's expected to be the proto type. As the fields are only usable by beam framework internals, it's fine to assume that they'll be the correct types by construction (validated by unit tests at some juncture).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on pull request #12445:
URL: https://github.com/apache/beam/pull/12445#issuecomment-667459681


   CC: @chamikaramj @robertwb @youngoli 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r471794342



##########
File path: sdks/go/pkg/beam/pipeline.go
##########
@@ -60,7 +60,8 @@ func (s Scope) String() string {
 // Pipelines can safely be executed concurrently.
 type Pipeline struct {
 	// real is the deferred execution Graph as it is being constructed.
-	real *graph.Graph
+	real               *graph.Graph
+	ExpandedTransforms map[string]*ExternalTransform

Review comment:
       Removed from pipeline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r471837313



##########
File path: sdks/go/pkg/beam/core/graph/xlang.go
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graph
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+type ExpandedTransform struct {
+	Components_     interface{} // *pipepb.Components
+	Transform_      interface{} //*pipepb.PTransform
+	Requirements_   []string
+	BoundedOutputs_ map[string]bool
+}
+
+func (exp *ExpandedTransform) Components() *pipepb.Components {
+	if c, ok := exp.Components_.(*pipepb.Components); ok {
+		return c
+	}
+	panic(errors.Errorf("malformed components; %v lacks a conforming pipeline component", exp))
+}
+
+func (exp *ExpandedTransform) Transform() *pipepb.PTransform {
+	if t, ok := exp.Transform_.(*pipepb.PTransform); ok {
+		return t
+	}
+	panic(errors.Errorf("malformed transform; %v lacks a conforming pipeline ptransform", exp))
+}
+
+func (exp *ExpandedTransform) Requirements() []string {
+	if exp.Requirements_ != nil {
+		return exp.Requirements_
+	}
+	return nil
+}
+
+func (exp *ExpandedTransform) BoundedOutputs() map[string]bool {
+	if exp.BoundedOutputs_ != nil {
+		return exp.BoundedOutputs_
+	}
+	return nil
+}
+
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	Urn           string
+	Payload       []byte
+	ExpansionAddr string
+
+	//replace all input/output fields with Inbound and Outbound id maps referencing the orginal Multiedge
+
+	inputs      map[string]*Node
+	Outputs     map[string]*Node
+	outputTypes map[string]typex.FullType
+
+	Expanded_ *ExpandedTransform
+}
+
+func (ext ExternalTransform) WithNamedInputs(inputs map[string]*Node) ExternalTransform {
+	if ext.inputs != nil {
+		panic(errors.Errorf("inputs already set as: \n%v", ext.inputs))
+	}
+	ext.inputs = inputs
+	return ext
+}
+
+func (ext ExternalTransform) WithNamedOutputs(outputTypes map[string]typex.FullType) ExternalTransform {

Review comment:
       Good use of a value method here and the similar functions. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465365506



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)

Review comment:
       Thanks for catching that! Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465373567



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+	/*
+		Build the ExpansionRequest
+	*/
+	// Obtaining the components and transform proto representing this transform
+	pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+	if err != nil {
+		panic(err)
+	}
+
+	// Adding fake impulses to each input as required for correct expansion
+	// TODO(pskevin): Remove these fake impulses from final Pipeline since multiple producers of the same PCollections is logically wrong
+	transforms := pipeline.Components.Transforms
+	rootTransformID := pipeline.RootTransformIds[0]
+	for tag, id := range transforms[rootTransformID].Inputs {
+		key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+		output := map[string]string{"out": id}
+		impulse := &pipepb.PTransform{
+			UniqueName: key,
+			Spec: &pipepb.FunctionSpec{
+				Urn: graphx.URNImpulse,
+			},
+			Outputs: output,
+		}
+
+		transforms[key] = impulse
+	}
+
+	// Assembling ExpansionRequest proto
+	req := &jobpb.ExpansionRequest{
+		Components: pipeline.Components,
+		Transform:  transforms[rootTransformID],
+		Namespace:  s.String(),
+	}
+
+	/*
+		Querying Expansion Service
+	*/
+	// Setting grpc client
+	conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+	if err != nil {
+		panic(err)

Review comment:
       Ack! Thanks for also exemplifying the correct function to use.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin closed pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin closed pull request #12445:
URL: https://github.com/apache/beam/pull/12445


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on pull request #12445:
URL: https://github.com/apache/beam/pull/12445#issuecomment-669026718


   I've addressed the initial comments with the exception of ones suggesting organizational changes. In general this discussion will dictate where the final (and cleaner) API will reside and how the user will make use of it. Since I won't be the stakeholder for this feature after a few weeks, I thought it'd be worthwhile bringing in more opinions – @lostluck @robertwb @youngoli 
   
   The challenges and possible solutions are as follows:
   
   - Where should all xlang code be?\
   There are multiple moving parts related to the execution of cross-language transforms which are now condensed into one `TryCrossLanguage` function in `external.go`  
   
   	- Allowing various combinations of inputs/outputs (n:n, 1:0, 0:1)
   	- Correctly adding `ExternalTransform` to `Graph` representing the unexpanded cross-language transform within the pipeline
   	- Building the `ExpansionRequest`
   	- Querying the expansion service for the corresponding `ExpansionResponse`
   	- Querying the expansion service for Artifact Resolution and Staging
   	- Handling the expanded components of the `ExternalTransform`
   
   	Decomposing these functions correctly within package(s) dedicated to `ExternalTransform` whether as `beam/.../xlangx` or `beam/.../xlang` or both will be ideal. It helps in segregation of responsibilities so that each function can be developed to maturity independent of each other. I can then make incremental progress on different fronts without compromising overall functionality that this PR has already achieved.  
   	Futhermore, `external.go` can then use the package(s) to expose generic APIs such as `beam.CrossLanguage` (for n:n inputs/outputs), `beam.CrossLanguageAsSource` (for 0:1 inputs/outputs) and `beam.CrossLanguageAsSink` (for 1:0 inputs/outputs).  
   	In the future, when the existing API is to be deprecated, these functions could then be renamed to their `beam.ExternalTransform`... counterparts. The exisiting API could be renamed to `beam.LegacyExternalTransform` and implemented using the API that exists as the outcome of this PR (since it is already backwards compatible for the most part).  
   
   	@lostluck already pointed to a possible segregation by removing pipeline handling code within `universal.go` into `graphx/xlang.go` to be used in `graphx/translate.go`. I could explore similar options in alignment to the results of this discussion.
   
   - How is `ExternalTransform` associated to pipeline construction?
   	- Adding to `Graph` and/or `MultiEdge`   
   	Currently, `graph.Opcode` for external transforms already exists (with value as `"External"`) and is used in conjunction with `graph.Payload` by the existing external transform API. Adding `ExternalTransform` to `Graph` will pull in proto and grpc dependencies within `beam/core/graph` regardless of whether the pipeline contains any cross-language transform or not. Adding `ExternalTransform` to `MultiEdge` will need additional logic to distinguish between an external transform using the proposed v/s current API.  
   	One scanario in which  I believe this method could be useful is if correct pipeline construction required explicit handling of `ExternalTransform` when each `MultiEdge` is being added during the `graphx.Marshal` call. As of now I don't feel that is the case.
   
   	- Augmenting `Pipeline` and `Scope`  
   	As is implemented in this PR, `ExpandedTransforms map[string]ExternalTransform` field is added to `Pipeline`. The key is the `MultiEdge`'s ID that represents an external transform within `Graph`. Using this, during pipeline construction, the Go proto representing this transform is swapped with the expanded `ExternalTransform`. All of the components associated with the expanded `ExternalTransform` are also added to the pipeline proto. This method does not conflict with any pipeline representation/construction code for strictly Go pipelines and may be more suitable for the future.  
   
   		An existing problem worth calling out is the necessity to pass in the `Pipeline` reference to `beam.CrossLanguage` to add the new `ExternalTransform` in the `ExpandedTransforms` map. A possible solution to this would be:
   		1. Adding `func (p *Pipeline) RegisterExternalTransform(k string, e *ExternalTransform)` function to `Pipeline`
   		2. Adding `AddMapEntry func(string, *ExternalTransform)` field to `Scope`
   		3. Changing `func (p *Pipeline) Root() Scope` to ` return Scope{scope: p.real.Root(), real: p.real, AddMapEntry: p.RegisterExternalTransform}`  
   
   		`beam.CrossLanguage` should then be able to use `s.AddMapEntry(key, externalTransform)` instead of the current implementation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465478490



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on pull request #12445:
URL: https://github.com/apache/beam/pull/12445#issuecomment-675183165


   Thanks a ton @lostluck!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on pull request #12445:
URL: https://github.com/apache/beam/pull/12445#issuecomment-667459499


   R: @lostluck


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r466758649



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to count words
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",

Review comment:
       Gotcha, but it might still be a good idea in this example if the expansion servers' addresses can change. I'd expect that if you were a user writing and running cross language pipelines, you wouldn't want to change the address in your code each time, you'd want some flag you can change instead. Same if you have multiple different expansion servers.
   
   Of course, if expansion servers are expected to have a consistent address then this seems good as-is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464721180



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
       As discussed, this data can be part of the graph.External node (or a new graph.CrossLanguage struct if desired) which keeps it as part of the graph and can be handled appropriately in graphx/translate.go. There's absolutely no need to add a new way to pass information in through the pipeline OR the suggestion you have for scope.
   
   Use the existing abstraction. If it's not sufficient, please articulate why.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465934590



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,144 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {

Review comment:
       The current version was never meant to be representative of what the final API would be. Only after discussing code organization was the API surface meant to be refined. 
   
   Thanks for the new type idea! It'll make separation of concerns much clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r471794063



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
       Ack. Updated!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r471794678



##########
File path: sdks/go/pkg/beam/runners/universal/universal.go
##########
@@ -82,6 +82,54 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		return errors.WithContextf(err, "generating model pipeline")
 	}
 
+	// Adding Expanded transforms to their counterparts in the Pipeline
+	for id, external := range p.ExpandedTransforms {

Review comment:
       I've reorganized much of the code. Your opinions would be helpful to understand which of them are not relevant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465361934



##########
File path: sdks/python/Pipfile
##########
@@ -0,0 +1,12 @@
+[[source]]
+name = "pypi"

Review comment:
       Resolving since this was caused due to botched git workflows and have since been corrected.

##########
File path: sdks/python/Pipfile.lock
##########
@@ -0,0 +1,946 @@
+{
+    "_meta": {
+        "hash": {
+            "sha256": "4d830f0b6c127288985ac4b35fef456c1bfcd8ffb2d771f32593d302e04bb80c"
+        },
+        "pipfile-spec": 6,
+        "requires": {

Review comment:
       Resolving since this was caused due to botched git workflows and have since been corrected.

##########
File path: sdks/python/apache_beam/transforms/validate_runner_xlang_test.py
##########
@@ -14,6 +14,42 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+"""
+###########################################################
+Runner Validation Test Suite for Cross-language Transforms
+###########################################################
+ As per Beams's Portability Framework design, Cross-language transforms

Review comment:
       Resolving since this was caused due to botched git workflows and have since been corrected.

##########
File path: sdks/python/apache_beam/examples/wordcount_docker.py
##########
@@ -0,0 +1,148 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.

Review comment:
       Resolving since this was caused due to botched git workflows and have since been corrected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465373887



##########
File path: sdks/go/pkg/beam/runners/universal/universal.go
##########
@@ -82,6 +82,54 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		return errors.WithContextf(err, "generating model pipeline")
 	}
 
+	// Adding Expanded transforms to their counterparts in the Pipeline
+	for id, external := range p.ExpandedTransforms {
+		pipeline.Requirements = append(pipeline.Requirements, external.Requirements...)
+
+		// Correct update of transform corresponding to the ExpandedTransform
+		// TODO(pskevin): Figure if there is a better way of supporting multiple outputs
+		transform := pipeline.Components.Transforms[id]
+		existingInput := ""
+		newInput := ""
+		for _, v := range transform.Outputs {
+			existingInput = v
+		}
+		for _, v := range external.ExpandedTransform.Outputs {
+			newInput = v
+		}
+
+		for _, t := range pipeline.Components.Transforms {
+			for idx, i := range t.Inputs {
+				if i == existingInput {
+					t.Inputs[idx] = newInput
+				}
+			}
+		}
+
+		// Adding components of the Expanded Transforms to the current Pipeline
+		for k, v := range external.Components.Transforms {

Review comment:
       That's an awesome helper. Thanks for pointing it out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464717996



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)

Review comment:
       Arguably we can remove this line for debugging.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.

Review comment:
       Feel free to delete the copy pasted documentation from the original wordcount here, it doesn't need to be repeated as it draws focus away from the important part: Cross Language.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main

Review comment:
       Go packages and binaries should have a doc string with a blank line between it and the apache license.
   
   // xlang_wordcount use a cross language transform from Python to count words from a file.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph

Review comment:
       Either remove this block comment entirely, or convert it to a line comment and fix the typo.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)

Review comment:
       Same here, uncomment the error return and remove the fmt.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")

Review comment:
       Uncomment this validation please, and remove the print out.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {

Review comment:
       Have both a TryCrossLanguage, which has an error return (to handle all the newly uncommented validations) and CrossLanguage without the error return. You can see how External passes the TryExternal results to a Must, so errors can be panicked if necesarry but otherwise propagated properly if Try is used.
   
   https://github.com/apache/beam/blob/43a4a119bf0d95a1fc33c65842b99ef0ebbcf041/sdks/go/pkg/beam/external.go#L170

##########
File path: sdks/go/examples/xlang/wordcount/input
##########
@@ -0,0 +1,5 @@
+Lorem ipsum dolor sit amet, consectetur adipiscing elit. Mauris id tellus vehicula, rutrum turpis quis, suscipit est. Quisque vehicula nec ex a interdum. Phasellus vulputate nunc sit amet nisl dapibus tincidunt ut ullamcorper nisi. Mauris gravida porta leo vel congue. Duis sit amet arcu eu nisl pharetra interdum a eget enim. Nulla facilisis massa ut egestas interdum. Nunc elit dui, hendrerit at pharetra a, pellentesque non turpis. Integer auctor vulputate congue. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Ut sagittis convallis lorem non semper. Ut ultrices elit a enim pulvinar fermentum.

Review comment:
       +1. For example/demo reasons, we can just do an "in memory" file instead. 
   
   AKA, copy pasted this into a variable (note the back tics for go's Raw string handling, eg for the newlines.)
   const lorem = \`
   Lorem ipsum...
   \`
   
   , and load that into beam.Create. One of the wordcount examples does this.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set

Review comment:
       As mentioned, don't worry about the legacy API at this point, focus on a clean implementation of the new API.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+	/*
+		Build the ExpansionRequest
+	*/
+	// Obtaining the components and transform proto representing this transform
+	pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+	if err != nil {
+		panic(err)
+	}
+
+	// Adding fake impulses to each input as required for correct expansion
+	// TODO(pskevin): Remove these fake impulses from final Pipeline since multiple producers of the same PCollections is logically wrong
+	transforms := pipeline.Components.Transforms
+	rootTransformID := pipeline.RootTransformIds[0]
+	for tag, id := range transforms[rootTransformID].Inputs {
+		key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+		output := map[string]string{"out": id}
+		impulse := &pipepb.PTransform{
+			UniqueName: key,
+			Spec: &pipepb.FunctionSpec{
+				Urn: graphx.URNImpulse,
+			},
+			Outputs: output,
+		}
+
+		transforms[key] = impulse
+	}
+
+	// Assembling ExpansionRequest proto
+	req := &jobpb.ExpansionRequest{
+		Components: pipeline.Components,
+		Transform:  transforms[rootTransformID],
+		Namespace:  s.String(),
+	}
+
+	/*
+		Querying Expansion Service
+	*/
+	// Setting grpc client
+	conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+	if err != nil {
+		panic(err)

Review comment:
       Change to a return once this can return errors. 
   return errors.Wrapf(err, "unable to connect to expansion service at %v, e.ExpansionAddr)

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+	/*
+		Build the ExpansionRequest
+	*/
+	// Obtaining the components and transform proto representing this transform
+	pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+	if err != nil {
+		panic(err)
+	}
+
+	// Adding fake impulses to each input as required for correct expansion
+	// TODO(pskevin): Remove these fake impulses from final Pipeline since multiple producers of the same PCollections is logically wrong
+	transforms := pipeline.Components.Transforms
+	rootTransformID := pipeline.RootTransformIds[0]
+	for tag, id := range transforms[rootTransformID].Inputs {
+		key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+		output := map[string]string{"out": id}
+		impulse := &pipepb.PTransform{
+			UniqueName: key,
+			Spec: &pipepb.FunctionSpec{
+				Urn: graphx.URNImpulse,
+			},
+			Outputs: output,
+		}
+
+		transforms[key] = impulse
+	}
+
+	// Assembling ExpansionRequest proto
+	req := &jobpb.ExpansionRequest{
+		Components: pipeline.Components,
+		Transform:  transforms[rootTransformID],
+		Namespace:  s.String(),
+	}
+
+	/*
+		Querying Expansion Service
+	*/
+	// Setting grpc client
+	conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+	if err != nil {
+		panic(err)
+	}
+	defer conn.Close()
+	client := jobpb.NewExpansionServiceClient(conn)
+
+	// Handling ExpansionResponse
+	res, err := client.Expand(context.Background(), req)
+	if err != nil {
+		panic(err)
+	}
+	e.Components = res.GetComponents()
+	e.ExpandedTransform = res.GetTransform()
+	e.Requirements = res.GetRequirements()
+
+	/*
+		Associating output PCollections of the expanded transform with correct internal outbound links and nodes
+	*/
+	// No information about the output types and bounded nature has been explicitly passed by the user
+	if len(e.Out) == 0 || cap(e.Out) == 0 {
+		// Infer output types from ExpansionResponse and update e.Out
+		if e.Out == nil {
+			// Use reverse schema encoding
+		} else {
+			// Use the coders list and map from coder id to internal FullType?
+		}
+	}

Review comment:
       Since the next change will handle PCollection mappings by user provided string keys, we can probably get rid of this for now. Do add a TODO(lostluck) to remind me to plug in the schema coders here instead of what we're doing for the output types.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+	/*
+		Build the ExpansionRequest
+	*/
+	// Obtaining the components and transform proto representing this transform
+	pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+	if err != nil {
+		panic(err)
+	}
+
+	// Adding fake impulses to each input as required for correct expansion
+	// TODO(pskevin): Remove these fake impulses from final Pipeline since multiple producers of the same PCollections is logically wrong
+	transforms := pipeline.Components.Transforms
+	rootTransformID := pipeline.RootTransformIds[0]
+	for tag, id := range transforms[rootTransformID].Inputs {
+		key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+		output := map[string]string{"out": id}
+		impulse := &pipepb.PTransform{
+			UniqueName: key,
+			Spec: &pipepb.FunctionSpec{
+				Urn: graphx.URNImpulse,
+			},
+			Outputs: output,
+		}
+
+		transforms[key] = impulse
+	}
+
+	// Assembling ExpansionRequest proto
+	req := &jobpb.ExpansionRequest{
+		Components: pipeline.Components,
+		Transform:  transforms[rootTransformID],
+		Namespace:  s.String(),
+	}
+
+	/*
+		Querying Expansion Service
+	*/
+	// Setting grpc client
+	conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+	if err != nil {
+		panic(err)
+	}
+	defer conn.Close()
+	client := jobpb.NewExpansionServiceClient(conn)
+
+	// Handling ExpansionResponse
+	res, err := client.Expand(context.Background(), req)
+	if err != nil {
+		panic(err)
+	}
+	e.Components = res.GetComponents()

Review comment:
       Note these can be added as fields (in one form or another), as an exported field on graph.MultiEdge
   
   https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/edge.go#L143
   
   For now disregard the restriction on dealing with protos in graph.
   
   Technically, one other approach to avoid the dependencies is to simply re-Marshal the received protos back to []byte and Unmarshal them in graphx/translate.go. As always, get it working first before trying to fix this dependency knot. 

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.
+	flag.Parse()
+	// beam.Init() is an initialization hook that must be called on startup. On
+	// distributed runners, it is used to intercept control.
+	beam.Init()
+
+	// Input validation is done as usual. Note that it must be after Init().
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	// Convert lines of text into individual words.
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",
+		Out:           []typex.FullType{outputType},
+		Bounded:       true, // TODO(pskevin): Infer this value from output PCollection(s) part of the expanded tranform
+	}
+	counted := beam.CrossLanguage(s, p, external) // TODO(pskevin): Add external transform to Pipeline without passing it to the transform
+
+	formatted := beam.ParDo(s, formatFn, counted[0])
+	textio.Write(s, *output, formatted)
+
+	// Concept #1: The beamx.Run convenience wrapper allows a number of

Review comment:
       Same comment here: Delete the unnecessary extra documentation.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.
+	flag.Parse()
+	// beam.Init() is an initialization hook that must be called on startup. On
+	// distributed runners, it is used to intercept control.
+	beam.Init()
+
+	// Input validation is done as usual. Note that it must be after Init().
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	// Convert lines of text into individual words.
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",
+		Out:           []typex.FullType{outputType},
+		Bounded:       true, // TODO(pskevin): Infer this value from output PCollection(s) part of the expanded tranform
+	}
+	counted := beam.CrossLanguage(s, p, external) // TODO(pskevin): Add external transform to Pipeline without passing it to the transform

Review comment:
       +1 I would strongly prefer not having a separate ExternalTransform struct. Pass the arguments in. I wouldn't worry about trying to accommodate the previous External api.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
       As discussed, this data can be part of the graph.External node (or a new node if desired) which keeps it as part of the graph and can be handled appropriately in graphx/translate.go

##########
File path: sdks/go/pkg/beam/runners/universal/universal.go
##########
@@ -82,6 +82,54 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		return errors.WithContextf(err, "generating model pipeline")
 	}
 
+	// Adding Expanded transforms to their counterparts in the Pipeline
+	for id, external := range p.ExpandedTransforms {
+		pipeline.Requirements = append(pipeline.Requirements, external.Requirements...)
+
+		// Correct update of transform corresponding to the ExpandedTransform
+		// TODO(pskevin): Figure if there is a better way of supporting multiple outputs
+		transform := pipeline.Components.Transforms[id]
+		existingInput := ""
+		newInput := ""
+		for _, v := range transform.Outputs {
+			existingInput = v
+		}
+		for _, v := range external.ExpandedTransform.Outputs {
+			newInput = v
+		}
+
+		for _, t := range pipeline.Components.Transforms {
+			for idx, i := range t.Inputs {
+				if i == existingInput {
+					t.Inputs[idx] = newInput
+				}
+			}
+		}
+
+		// Adding components of the Expanded Transforms to the current Pipeline
+		for k, v := range external.Components.Transforms {

Review comment:
       You should be able to clean this up with pipelinex.Update
   
   https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go#L32
   
   Note that you'll need to delete the "go" entry from the environments first to do that safely for the loopback mode fix.

##########
File path: sdks/go/pkg/beam/pipeline.go
##########
@@ -60,7 +60,8 @@ func (s Scope) String() string {
 // Pipelines can safely be executed concurrently.
 type Pipeline struct {
 	// real is the deferred execution Graph as it is being constructed.
-	real *graph.Graph
+	real               *graph.Graph
+	ExpandedTransforms map[string]*ExternalTransform

Review comment:
       As discussed, we don't want to have to hang the expanded transforms on the pipeline here.

##########
File path: sdks/go/pkg/beam/runners/universal/universal.go
##########
@@ -82,6 +82,54 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		return errors.WithContextf(err, "generating model pipeline")
 	}
 
+	// Adding Expanded transforms to their counterparts in the Pipeline
+	for id, external := range p.ExpandedTransforms {

Review comment:
       Similarly, this code should be moved into graphx/translate.go (though TBH this is substantial enough that having things isolated in function in a graphx/xlang.go which are then called in translate.go would be a good move.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465365206



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.
+	flag.Parse()
+	// beam.Init() is an initialization hook that must be called on startup. On
+	// distributed runners, it is used to intercept control.
+	beam.Init()
+
+	// Input validation is done as usual. Note that it must be after Init().
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	// Convert lines of text into individual words.
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",
+		Out:           []typex.FullType{outputType},
+		Bounded:       true, // TODO(pskevin): Infer this value from output PCollection(s) part of the expanded tranform
+	}
+	counted := beam.CrossLanguage(s, p, external) // TODO(pskevin): Add external transform to Pipeline without passing it to the transform

Review comment:
       Ack.
   
   All references to the previous API are just placeholders where (in the future) code to port the legacy API will go. They don't influence how the current API is being developed. In fact it's just a bonus that the previous API can be cleanly supported.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r466056636



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to count words
+package main

Review comment:
       This example requires running an expansion service separately in order to work, right? I'd add instructions to the package comment on how to run that so people can run this example without existing knowledge of how xlang works. See the stringsplit example for an example of this. It requires running on a job service that supports splitting, so I included instructions for running an external job service.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to count words
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",

Review comment:
       Expansion address seems like a good candidate to be a flag instead.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,144 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	return MustN(TryCrossLanguage(s, p, e))
+}
+
+func TryCrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) ([]PCollection, error) {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	// Add ExternalTransform to the Graph
+
+	// Validating scope and inputs
+	if !s.IsValid() {
+		return nil, errors.New("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)

Review comment:
       Nit: Typo
   ```suggestion
   	// Using existing MultiEdge format to represent ExternalTransform (already backwards compatible)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465934906



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,144 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string

Review comment:
       Awesome suggestion. Will work on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464672379



##########
File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
##########
@@ -177,6 +232,14 @@ public void coGroupByKeyTest() {
     PAssert.that(col).containsInAnyOrder("0:1,2,4", "1:3,5,6");
   }
 
+  /**
+   * Motivation behind combineGloballyTest.

Review comment:
       Let's keep these unrelated documentation updates in a separate PR.

##########
File path: sdks/go/examples/xlang/wordcount/input
##########
@@ -0,0 +1,5 @@
+Lorem ipsum dolor sit amet, consectetur adipiscing elit. Mauris id tellus vehicula, rutrum turpis quis, suscipit est. Quisque vehicula nec ex a interdum. Phasellus vulputate nunc sit amet nisl dapibus tincidunt ut ullamcorper nisi. Mauris gravida porta leo vel congue. Duis sit amet arcu eu nisl pharetra interdum a eget enim. Nulla facilisis massa ut egestas interdum. Nunc elit dui, hendrerit at pharetra a, pellentesque non turpis. Integer auctor vulputate congue. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Ut sagittis convallis lorem non semper. Ut ultrices elit a enim pulvinar fermentum.

Review comment:
       Is it possible to use a generated input for tests instead of committing this file ?

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main

Review comment:
       You need to add Apache license to all new files.

##########
File path: sdks/python/Pipfile.lock
##########
@@ -0,0 +1,946 @@
+{
+    "_meta": {
+        "hash": {
+            "sha256": "4d830f0b6c127288985ac4b35fef456c1bfcd8ffb2d771f32593d302e04bb80c"
+        },
+        "pipfile-spec": 6,
+        "requires": {

Review comment:
       Ditto.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.
+	flag.Parse()
+	// beam.Init() is an initialization hook that must be called on startup. On
+	// distributed runners, it is used to intercept control.
+	beam.Init()
+
+	// Input validation is done as usual. Note that it must be after Init().
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	// Convert lines of text into individual words.
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",
+		Out:           []typex.FullType{outputType},
+		Bounded:       true, // TODO(pskevin): Infer this value from output PCollection(s) part of the expanded tranform
+	}
+	counted := beam.CrossLanguage(s, p, external) // TODO(pskevin): Add external transform to Pipeline without passing it to the transform

Review comment:
       Can we directly setup 'CrossLanguage' transform as the primary user API instead of having to setup a separate ExternalTransform struct ?

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.
+	flag.Parse()
+	// beam.Init() is an initialization hook that must be called on startup. On
+	// distributed runners, it is used to intercept control.
+	beam.Init()
+
+	// Input validation is done as usual. Note that it must be after Init().
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	// Convert lines of text into individual words.
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{

Review comment:
       Woohoo!

##########
File path: sdks/python/Pipfile
##########
@@ -0,0 +1,12 @@
+[[source]]
+name = "pypi"

Review comment:
       Do we need this ?

##########
File path: sdks/python/apache_beam/examples/wordcount_docker.py
##########
@@ -0,0 +1,148 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.

Review comment:
       Probably extract out these wordcount_xxx updates to a separate PR.

##########
File path: sdks/python/apache_beam/transforms/validate_runner_xlang_test.py
##########
@@ -14,6 +14,42 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+"""
+###########################################################
+Runner Validation Test Suite for Cross-language Transforms
+###########################################################
+ As per Beams's Portability Framework design, Cross-language transforms

Review comment:
       Ditto regarding keeping these documentation updates in the other PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465373296



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph

Review comment:
       I'm definitely missing something since I can't see any typo. Could you be more explicit?
   
   Also, the block comments were meant just to separate sections of independent logic which I expect to break down into functions during clean up. Should I remove all the block comments?

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on pull request #12445:
URL: https://github.com/apache/beam/pull/12445#issuecomment-675178008


   @lostluck My updates are ready to review!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #12445:
URL: https://github.com/apache/beam/pull/12445


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464721180



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
       As discussed, this data can be part of the graph.External node (or a new graph.CrossLanguage struct if desired) which keeps it as part of the graph and can be handled appropriately in graphx/translate.go. There's absolutely no need to add a new way to pass information in through the pipeline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465360901



##########
File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
##########
@@ -177,6 +232,14 @@ public void coGroupByKeyTest() {
     PAssert.that(col).containsInAnyOrder("0:1,2,4", "1:3,5,6");
   }
 
+  /**
+   * Motivation behind combineGloballyTest.

Review comment:
       Resolving since this was caused due to botched git workflows and have since been corrected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465380563



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph

Review comment:
       ExternalTranform should be ExternalTransform.
   
   I'd prefer no block comments. They're uncommon in Go code.  
   
   To be clear, commenting on what the next sections of code is what comments are for. They don't need to take up so much space though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465370813



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.

Review comment:
       Thanks for catching that! Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465370908



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"regexp"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	// Set this option to choose a different input file or glob.
+	input = flag.String("input", "./input", "File(s) to read.")
+
+	// Set this required option to specify where to write the output.
+	output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
+	for _, word := range wordRE.FindAllString(line, -1) {
+		emit(word)
+	}
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	fmt.Println(w, c)
+	return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	// If beamx or Go flags are used, flags must be parsed first.
+	flag.Parse()
+	// beam.Init() is an initialization hook that must be called on startup. On
+	// distributed runners, it is used to intercept control.
+	beam.Init()
+
+	// Input validation is done as usual. Note that it must be after Init().
+	if *output == "" {
+		log.Fatal("No output provided")
+	}
+
+	// Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	lines := textio.Read(s, *input)
+	// Convert lines of text into individual words.
+	col := beam.ParDo(s, extractFn, lines)
+
+	// Using Cross-language Count from Python's test expansion service
+	// TODO(pskevin): Cleaner using-face API
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	external := &beam.ExternalTransform{
+		In:            []beam.PCollection{col},
+		Urn:           "beam:transforms:xlang:count",
+		ExpansionAddr: "localhost:8118",
+		Out:           []typex.FullType{outputType},
+		Bounded:       true, // TODO(pskevin): Infer this value from output PCollection(s) part of the expanded tranform
+	}
+	counted := beam.CrossLanguage(s, p, external) // TODO(pskevin): Add external transform to Pipeline without passing it to the transform
+
+	formatted := beam.ParDo(s, formatFn, counted[0])
+	textio.Write(s, *output, formatted)
+
+	// Concept #1: The beamx.Run convenience wrapper allows a number of

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465373680



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph
+	*/
+	// Validating scope and inputs
+	if !s.IsValid() {
+		// return nil, errors.New("invalid scope")
+		fmt.Println("invalid scope")
+	}
+	for i, col := range e.In {
+		if !col.IsValid() {
+			// return nil, errors.Errorf("invalid pcollection to external: index %v", i)
+			fmt.Printf("\ninvalid pcollection to external: index %v", i)
+
+		}
+	}
+
+	// Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible)
+	payload := &graph.Payload{
+		URN:  e.Urn,
+		Data: e.Payload,
+	}
+	var ins []*graph.Node
+	for _, col := range e.In {
+		ins = append(ins, col.n)
+	}
+	edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+	// TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline
+	// Adding ExternalTransform to pipeline referenced by MultiEdge ID
+	if p.ExpandedTransforms == nil {
+		p.ExpandedTransforms = make(map[string]*ExternalTransform)
+	}
+	p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+	/*
+		Build the ExpansionRequest
+	*/
+	// Obtaining the components and transform proto representing this transform
+	pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+	if err != nil {
+		panic(err)
+	}
+
+	// Adding fake impulses to each input as required for correct expansion
+	// TODO(pskevin): Remove these fake impulses from final Pipeline since multiple producers of the same PCollections is logically wrong
+	transforms := pipeline.Components.Transforms
+	rootTransformID := pipeline.RootTransformIds[0]
+	for tag, id := range transforms[rootTransformID].Inputs {
+		key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+		output := map[string]string{"out": id}
+		impulse := &pipepb.PTransform{
+			UniqueName: key,
+			Spec: &pipepb.FunctionSpec{
+				Urn: graphx.URNImpulse,
+			},
+			Outputs: output,
+		}
+
+		transforms[key] = impulse
+	}
+
+	// Assembling ExpansionRequest proto
+	req := &jobpb.ExpansionRequest{
+		Components: pipeline.Components,
+		Transform:  transforms[rootTransformID],
+		Namespace:  s.String(),
+	}
+
+	/*
+		Querying Expansion Service
+	*/
+	// Setting grpc client
+	conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+	if err != nil {
+		panic(err)
+	}
+	defer conn.Close()
+	client := jobpb.NewExpansionServiceClient(conn)
+
+	// Handling ExpansionResponse
+	res, err := client.Expand(context.Background(), req)
+	if err != nil {
+		panic(err)
+	}
+	e.Components = res.GetComponents()
+	e.ExpandedTransform = res.GetTransform()
+	e.Requirements = res.GetRequirements()
+
+	/*
+		Associating output PCollections of the expanded transform with correct internal outbound links and nodes
+	*/
+	// No information about the output types and bounded nature has been explicitly passed by the user
+	if len(e.Out) == 0 || cap(e.Out) == 0 {
+		// Infer output types from ExpansionResponse and update e.Out
+		if e.Out == nil {
+			// Use reverse schema encoding
+		} else {
+			// Use the coders list and map from coder id to internal FullType?
+		}
+	}

Review comment:
       Makes sense. Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465380821



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+	if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set
+		// return Legacy External API
+	}
+
+	/*
+		Add ExternalTranform to the Graph

Review comment:
       To be pedantically clear: No block comments doesn't mean no comments. In my original comment, I suggested the line comment alternative.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

Posted by GitBox <gi...@apache.org>.
pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465478664



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+	"context"
+	"fmt"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+	id                int
+	Urn               string
+	Payload           []byte
+	In                []PCollection
+	Out               []FullType
+	Bounded           bool
+	ExpansionAddr     string
+	Components        *pipepb.Components
+	ExpandedTransform *pipepb.PTransform
+	Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org