You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/08/22 01:47:33 UTC

[beam] branch master updated: [BEAM-9918] Adding tests and documentation to xlang components (#12667)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb71bc  [BEAM-9918] Adding tests and documentation to xlang components (#12667)
bdb71bc is described below

commit bdb71bc7753f5043ad1195b310315161596a50a1
Author: Kevin Sijo Puthusseri <25...@users.noreply.github.com>
AuthorDate: Sat Aug 22 01:47:09 2020 +0000

    [BEAM-9918] Adding tests and documentation to xlang components (#12667)
---
 sdks/go/examples/xlang/cogroup_by/cogroup_by.go    | 107 ++++
 sdks/go/examples/xlang/combine/combine.go          | 101 ++++
 .../xlang/combine_globally/combine_globally.go     |  79 +++
 sdks/go/examples/xlang/flatten/flatten.go          |  81 +++
 sdks/go/examples/xlang/group_by/group_by.go        | 105 ++++
 sdks/go/examples/xlang/multi_input_output/multi.go |  74 +++
 sdks/go/examples/xlang/partition/partition.go      |  83 +++
 sdks/go/examples/xlang/wordcount/wordcount.go      |  50 +-
 sdks/go/pkg/beam/core/graph/edge.go                |  33 +-
 sdks/go/pkg/beam/core/graph/xlang.go               |  43 +-
 sdks/go/pkg/beam/core/graph/xlang_test.go          |  86 +++
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |   5 +-
 sdks/go/pkg/beam/core/runtime/graphx/xlang.go      |  10 +
 sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go | 192 +++++++
 sdks/go/pkg/beam/core/runtime/xlangx/expand.go     |   1 +
 sdks/go/pkg/beam/core/runtime/xlangx/namespace.go  |  43 +-
 .../pkg/beam/core/runtime/xlangx/namespace_test.go | 579 +++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/xlangx/translate.go  |  60 ++-
 sdks/go/pkg/beam/runners/universal/universal.go    |   3 +
 sdks/go/pkg/beam/xlang.go                          | 186 +++++--
 20 files changed, 1794 insertions(+), 127 deletions(-)

diff --git a/sdks/go/examples/xlang/cogroup_by/cogroup_by.go b/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
new file mode 100644
index 0000000..82d735f
--- /dev/null
+++ b/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// cogroup_by exemplifies using a cross-language cogroup by key transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w int64, c []string) string {
+	sort.Strings(c)
+	return fmt.Sprintf("%v:%v", w, c)
+}
+
+// KV used to represent KV PCollection values
+type KV struct {
+	X int64
+	Y string
+}
+
+func getKV(kv KV, emit func(int64, string)) {
+	emit(kv.X, kv.Y)
+}
+
+func sumCounts(key int64, iter1 func(*string) bool) (int64, []string) {
+	var val string
+	var values []string
+
+	for iter1(&val) {
+		values = append(values, val)
+	}
+	return key, values
+}
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
+	beam.RegisterFunction(formatFn)
+	beam.RegisterFunction(getKV)
+	beam.RegisterFunction(sumCounts)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *expansionAddr == "" {
+		log.Fatal("No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	// Using the cross-language transform
+	col1 := beam.ParDo(s, getKV, beam.Create(s, KV{X: 0, Y: "1"}, KV{X: 0, Y: "2"}, KV{X: 1, Y: "3"}))
+	col2 := beam.ParDo(s, getKV, beam.Create(s, KV{X: 0, Y: "4"}, KV{X: 1, Y: "5"}, KV{X: 1, Y: "6"}))
+	namedInputs := map[string]beam.PCollection{"col1": col1, "col2": col2}
+	outputType := typex.NewCoGBK(typex.New(reflectx.Int64), typex.New(reflectx.String))
+	c := beam.CrossLanguageWithSink(s, "beam:transforms:xlang:test:cgbk", nil, *expansionAddr, namedInputs, outputType)
+
+	sums := beam.ParDo(s, sumCounts, c)
+	formatted := beam.ParDo(s, formatFn, sums)
+	passert.Equals(s, formatted, "0:[1 2 4]", "1:[3 5 6]")
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/examples/xlang/combine/combine.go b/sdks/go/examples/xlang/combine/combine.go
new file mode 100644
index 0000000..7f77571
--- /dev/null
+++ b/sdks/go/examples/xlang/combine/combine.go
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// combine exemplifies using a cross-language combine per key transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+	return fmt.Sprintf("%s:%v", w, c)
+}
+
+// KV used to represent KV PCollection values
+type KV struct {
+	X string
+	Y int64
+}
+
+func getKV(kv KV, emit func(string, int64)) {
+	emit(kv.X, kv.Y)
+}
+
+func sumCounts(key string, iter func(*int64) bool) (string, int64) {
+	var count, sum int64
+	for iter(&count) {
+		sum += count
+	}
+	return key, sum
+}
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
+	beam.RegisterFunction(formatFn)
+	beam.RegisterFunction(getKV)
+	beam.RegisterFunction(sumCounts)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *expansionAddr == "" {
+		log.Fatal("No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	// Using the cross-language transform
+	kvs := beam.Create(s, KV{X: "a", Y: 1}, KV{X: "a", Y: 2}, KV{X: "b", Y: 3})
+	ins := beam.ParDo(s, getKV, kvs)
+	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	c := beam.CrossLanguageWithSingleInputOutput(s, "beam:transforms:xlang:test:compk", nil, *expansionAddr, ins, outputType)
+
+	formatted := beam.ParDo(s, formatFn, c)
+	passert.Equals(s, formatted, "a:3", "b:3")
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/examples/xlang/combine_globally/combine_globally.go b/sdks/go/examples/xlang/combine_globally/combine_globally.go
new file mode 100644
index 0000000..6e6576c
--- /dev/null
+++ b/sdks/go/examples/xlang/combine_globally/combine_globally.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// combine_globally exemplifies using a cross-language combine global transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(c int64) string {
+	return fmt.Sprintf("%v", c)
+}
+
+func init() {
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *expansionAddr == "" {
+		log.Fatal("No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	in := beam.CreateList(s, []int64{1, 2, 3})
+
+	// Using the cross-language transform
+	outputType := typex.New(reflectx.Int64)
+	c := beam.CrossLanguageWithSingleInputOutput(s, "beam:transforms:xlang:test:comgl", nil, *expansionAddr, in, outputType)
+
+	formatted := beam.ParDo(s, formatFn, c)
+	passert.Equals(s, formatted, "6")
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/examples/xlang/flatten/flatten.go b/sdks/go/examples/xlang/flatten/flatten.go
new file mode 100644
index 0000000..2c36d1c
--- /dev/null
+++ b/sdks/go/examples/xlang/flatten/flatten.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+// flatten exemplifies using a cross-language flatten transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(c int64) string {
+	return fmt.Sprintf("%v", c)
+}
+
+func init() {
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *expansionAddr == "" {
+		log.Fatal("No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	col1 := beam.CreateList(s, []int64{1, 2, 3})
+	col2 := beam.CreateList(s, []int64{4, 5, 6})
+
+	// Using the cross-language transform
+	namedInputs := map[string]beam.PCollection{"col1": col1, "col2": col2}
+	outputType := typex.New(reflectx.Int64)
+	c := beam.CrossLanguageWithSink(s, "beam:transforms:xlang:test:flatten", nil, *expansionAddr, namedInputs, outputType)
+
+	formatted := beam.ParDo(s, formatFn, c)
+	passert.Equals(s, formatted, "1", "2", "3", "4", "5", "6")
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/examples/xlang/group_by/group_by.go b/sdks/go/examples/xlang/group_by/group_by.go
new file mode 100644
index 0000000..9d51a00
--- /dev/null
+++ b/sdks/go/examples/xlang/group_by/group_by.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// group_by exemplifies using a cross-language group by key transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c []int) string {
+	sort.Ints(c)
+	return fmt.Sprintf("%v:%v", w, c)
+}
+
+// KV used to represent KV PCollection values
+type KV struct {
+	X string
+	Y int64
+}
+
+func getKV(kv KV, emit func(string, int64)) {
+	emit(kv.X, kv.Y)
+}
+
+func collectValues(key string, iter func(*int64) bool) (string, []int) {
+	var count int64
+	var values []int
+	for iter(&count) {
+		values = append(values, int(count))
+	}
+	return key, values
+}
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
+	beam.RegisterFunction(formatFn)
+	beam.RegisterFunction(getKV)
+	beam.RegisterFunction(collectValues)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *expansionAddr == "" {
+		log.Fatal("No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	// Using the cross-language transform
+	kvs := beam.Create(s, KV{X: "0", Y: 1}, KV{X: "0", Y: 2}, KV{X: "1", Y: 3})
+	ins := beam.ParDo(s, getKV, kvs)
+	outputType := typex.NewCoGBK(typex.New(reflectx.String), typex.New(reflectx.Int64))
+	outs := beam.CrossLanguageWithSingleInputOutput(s, "beam:transforms:xlang:test:gbk", nil, *expansionAddr, ins, outputType)
+
+	vals := beam.ParDo(s, collectValues, outs)
+	formatted := beam.ParDo(s, formatFn, vals)
+	passert.Equals(s, formatted, "0:[1 2]", "1:[3]")
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/examples/xlang/multi_input_output/multi.go b/sdks/go/examples/xlang/multi_input_output/multi.go
new file mode 100644
index 0000000..4f6e654
--- /dev/null
+++ b/sdks/go/examples/xlang/multi_input_output/multi.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// multi exemplifies using a cross-language transform with multiple inputs and
+// outputs from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"context"
+	"flag"
+	"log"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *expansionAddr == "" {
+		log.Fatal("No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	main1 := beam.CreateList(s, []string{"a", "bb"})
+	main2 := beam.CreateList(s, []string{"x", "yy", "zzz"})
+	side := beam.CreateList(s, []string{"s"})
+	namedInputs := map[string]beam.PCollection{"main1": main1, "main2": main2, "side": side}
+
+	// Using the cross-language transform
+	outputType := typex.New(reflectx.String)
+	namedOutputs := map[string]typex.FullType{"main": outputType, "side": outputType}
+	multi := beam.CrossLanguage(s, "beam:transforms:xlang:test:multi", nil, *expansionAddr, namedInputs, namedOutputs)
+
+	passert.Equals(s, multi["main"], "as", "bbs", "xs", "yys", "zzzs")
+	passert.Equals(s, multi["side"], "ss")
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/examples/xlang/partition/partition.go b/sdks/go/examples/xlang/partition/partition.go
new file mode 100644
index 0000000..8164118
--- /dev/null
+++ b/sdks/go/examples/xlang/partition/partition.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// partition exemplifies using a cross-language partition transform from a test expansion service.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
+)
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(c int64) string {
+	return fmt.Sprintf("%v", c)
+}
+
+func init() {
+	beam.RegisterFunction(formatFn)
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	if *expansionAddr == "" {
+		log.Fatal("No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	col := beam.CreateList(s, []int64{1, 2, 3, 4, 5, 6})
+
+	// Using the cross-language transform
+	outputType := typex.New(reflectx.Int64)
+	namedOutputs := map[string]typex.FullType{"0": outputType, "1": outputType}
+	c := beam.CrossLanguageWithSource(s, "beam:transforms:xlang:test:partition", nil, *expansionAddr, col, namedOutputs)
+
+	formatted0 := beam.ParDo(s, formatFn, c["0"])
+	formatted1 := beam.ParDo(s, formatFn, c["1"])
+
+	passert.Equals(s, formatted0, "2", "4", "6")
+	passert.Equals(s, formatted1, "1", "3", "5")
+
+	if err := beamx.Run(context.Background(), p); err != nil {
+		log.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/examples/xlang/wordcount/wordcount.go b/sdks/go/examples/xlang/wordcount/wordcount.go
index c14e2bd..373a12c 100644
--- a/sdks/go/examples/xlang/wordcount/wordcount.go
+++ b/sdks/go/examples/xlang/wordcount/wordcount.go
@@ -13,7 +13,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// xlang_wordcount exemplifies using a cross language transform from Python to count words
+// wordcount exemplifies using a cross-language Count transform from a test
+// expansion service to count words.
+//
+// Prerequisites to run wordcount:
+// –> [Required] Job needs to be submitted to a portable runner (--runner=universal)
+// –> [Required] Endpoint of job service needs to be passed (--endpoint=<ip:port>)
+// –> [Required] Endpoint of expansion service needs to be passed (--expansion_addr=<ip:port>)
+// –> [Optional] Environment type can be LOOPBACK. Defaults to DOCKER. (--environment_type=LOOPBACK|DOCKER)
 package main
 
 import (
@@ -24,11 +31,10 @@ import (
 	"regexp"
 	"strings"
 
+	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
-
-	"github.com/apache/beam/sdks/go/pkg/beam"
-	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
 
 	// Imports to enable correct filesystem access and runner setup in LOOPBACK mode
@@ -38,10 +44,7 @@ import (
 )
 
 var (
-	// Set this required option to specify where to write the output.
-	output = flag.String("output", "./output", "Output file (required).")
-
-	expansionAddr = flag.String("expansion-addr", "", "Address of Expansion Service")
+	expansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
 )
 
 var (
@@ -63,7 +66,7 @@ func extractFn(ctx context.Context, line string, emit func(string)) {
 
 // formatFn is a DoFn that formats a word and its count as a string.
 func formatFn(w string, c int64) string {
-	return fmt.Sprintf("%s: %v", w, c)
+	return fmt.Sprintf("%s:%v", w, c)
 }
 
 func init() {
@@ -75,10 +78,6 @@ func main() {
 	flag.Parse()
 	beam.Init()
 
-	if *output == "" {
-		log.Fatal("No output provided")
-	}
-
 	if *expansionAddr == "" {
 		log.Fatal("No expansion address provided")
 	}
@@ -89,27 +88,22 @@ func main() {
 	lines := beam.CreateList(s, strings.Split(lorem, "\n"))
 	col := beam.ParDo(s, extractFn, lines)
 
-	// Using Cross-language Count from Python's test expansion service
+	// Using the cross-language transform
 	outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64))
-	counted := beam.CrossLanguage(s,
-		"beam:transforms:xlang:count",
-		nil,
-		*expansionAddr,
-		map[string]beam.PCollection{"xlang-in": col},
-		map[string]typex.FullType{"output": outputType}, // Should be "None" when submitting to Python Expansion service
-	)
+	counted := beam.CrossLanguageWithSingleInputOutput(s, "beam:transforms:xlang:count", nil, *expansionAddr, col, outputType)
 
-	formatted := beam.ParDo(s, formatFn, counted["output"])
-	textio.Write(s, *output, formatted)
+	formatted := beam.ParDo(s, formatFn, counted)
+	passert.Equals(s, formatted, "a:4", "b:4", "c:5")
 
 	if err := beamx.Run(context.Background(), p); err != nil {
 		log.Fatalf("Failed to execute job: %v", err)
 	}
 }
 
-var lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Mauris id tellus vehicula, rutrum turpis quis, suscipit est. Quisque vehicula nec ex a interdum. Phasellus vulputate nunc sit amet nisl dapibus tincidunt ut ullamcorper nisi. Mauris gravida porta leo vel congue. Duis sit amet arcu eu nisl pharetra interdum a eget enim. Nulla facilisis massa ut egestas interdum. Nunc elit dui, hendrerit at pharetra a, pellentesque non turpis. Integer auctor vulputate congue. Pellentesqu [...]
-In rhoncus, diam sit amet laoreet ullamcorper, turpis lorem ornare tortor, eget eleifend purus risus id justo. Ut eget tortor vel elit aliquet egestas. Integer iaculis ipsum at nunc condimentum accumsan. Ut tristique felis ut metus tincidunt, quis ullamcorper diam convallis. Donec mattis ultrices lorem, id placerat tellus venenatis at. Donec quis nulla dui. Pellentesque at semper nunc. Aenean orci dui, dictum id urna a, luctus consequat augue. Nulla hendrerit mi ut quam iaculis euismod.  [...]
-Suspendisse tempus vestibulum magna ac sollicitudin. Pellentesque id consequat lorem. Curabitur laoreet at velit a laoreet. Donec justo lectus, elementum eu elit a, placerat elementum turpis. Aliquam pretium ipsum eros, quis ultricies metus lobortis ut. Cras porta congue luctus. Curabitur vestibulum lacus est, quis eleifend metus euismod id. Duis vel ante ipsum. Proin sed posuere nulla. Sed nisi mauris, consequat ut eros vel, mollis bibendum dolor. Aenean laoreet lacus a eros iaculis eleifend.
-Fusce tempor tortor vel eleifend ornare. Maecenas euismod vitae nunc vel congue. Sed in tristique felis, at venenatis arcu. Etiam egestas sem quis accumsan aliquet. Ut arcu lorem, auctor et metus venenatis, ullamcorper pharetra turpis. Ut gravida, eros ac tristique faucibus, lorem quam sollicitudin lacus, tempus porta tortor risus non lectus. Nunc consequat in magna sed tincidunt. Nullam lacus mi, vulputate et erat eu, rutrum fermentum leo. Sed blandit lobortis nisl et auctor. Curabitur  [...]
-Mauris felis urna, tincidunt quis fermentum ut, consequat eu mauris. Nulla placerat venenatis molestie. Suspendisse vitae bibendum ante. Nulla lacinia hendrerit diam non feugiat. Curabitur efficitur risus in porttitor condimentum. Pellentesque tincidunt tincidunt diam, et mollis nibh consequat id. Nulla ultrices, ligula interdum convallis varius, mi odio posuere metus, id congue risus nisl at erat. Sed rhoncus, eros eget ullamcorper interdum, leo nibh condimentum neque, eu eleifend metus [...]
+var lorem = `a b b c
+b c a
+a b c
+c
+a
+c
 `
diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go
index c4f259e..8016f31 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -18,6 +18,7 @@ package graph
 import (
 	"fmt"
 	"reflect"
+	"sort"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
@@ -282,7 +283,7 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
 	return edge, nil
 }
 
-// NewCrossLanguage inserts a Cross-langugae External transform.
+// NewCrossLanguage inserts a Cross-langugae External transform using initialized input and output nodes
 func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins []*Inbound, outs []*Outbound) (*MultiEdge, func(*Node, bool)) {
 	edge := g.NewEdge(s)
 	edge.Op = External
@@ -303,11 +304,22 @@ func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins []*Inbound
 	return edge, isBoundedUpdater
 }
 
-func NewNamedInboundLinks(ins map[string]*Node) (map[string]int, []*Inbound) {
+// NamedInboundLinks returns an array of new Inbound links and a map (tag ->
+// index of Inbound in MultiEdge.Input) of corresponding indices with respect to
+// their names.
+func NamedInboundLinks(ins map[string]*Node) (map[string]int, []*Inbound) {
 	inputsMap := make(map[string]int)
 	var inboundLinks []*Inbound
 
-	for tag, node := range ins {
+	// Ensuring deterministic order of Nodes
+	var tags []string
+	for tag := range ins {
+		tags = append(tags, tag)
+	}
+	sort.Strings(tags)
+
+	for _, tag := range tags {
+		node := ins[tag]
 		id := len(inboundLinks)
 		inputsMap[tag] = id
 		inboundLinks = append(inboundLinks, &Inbound{Kind: Main, From: node, Type: node.Type()})
@@ -316,11 +328,22 @@ func NewNamedInboundLinks(ins map[string]*Node) (map[string]int, []*Inbound) {
 	return inputsMap, inboundLinks
 }
 
-func NewNamedOutboundLinks(g *Graph, outs map[string]typex.FullType) (map[string]int, []*Outbound) {
+// NamedOutboundLinks returns an array of new Outbound links and a map (tag ->
+// index of Outbound in MultiEdge.Output) of corresponding indices with respect
+// to their names.
+func NamedOutboundLinks(g *Graph, outs map[string]typex.FullType) (map[string]int, []*Outbound) {
 	outputsMap := make(map[string]int)
 	var outboundLinks []*Outbound
 
-	for tag, fullType := range outs {
+	// Ensuring deterministic order of Nodes
+	var tags []string
+	for tag := range outs {
+		tags = append(tags, tag)
+	}
+	sort.Strings(tags)
+
+	for _, tag := range tags {
+		fullType := outs[tag]
 		node := g.NewNode(fullType, nil, true)
 		id := len(outboundLinks)
 		outputsMap[tag] = id
diff --git a/sdks/go/pkg/beam/core/graph/xlang.go b/sdks/go/pkg/beam/core/graph/xlang.go
index 617474b..f78913f 100644
--- a/sdks/go/pkg/beam/core/graph/xlang.go
+++ b/sdks/go/pkg/beam/core/graph/xlang.go
@@ -24,18 +24,42 @@ import (
 )
 
 var (
+	// SourceInputTag is a constant random string used when an ExternalTransform
+	// expects a single unnamed input. xlangx and graphx use it to explicitly
+	// bypass steps in pipeline construction meant for named inputs
 	SourceInputTag string
-	SinkOutputTag  string
-	NewNamespace   func() string
+
+	// SinkOutputTag is a constant random string used when an ExternalTransform
+	// expects a single unnamed output. xlangx and graphx use it to explicitly
+	// bypass steps in pipeline construction meant for named outputs.
+	SinkOutputTag string
+
+	// NewNamespace is a utility random string generator used by the xlang to
+	// scope individual ExternalTransforms by a unique namespace
+	NewNamespace func() string
 )
 
+func init() {
+	NewNamespace = NewNamespaceGenerator(10)
+	SourceInputTag = NewNamespace()
+	SinkOutputTag = NewNamespace()
+}
+
+// ExpandedTransform stores the expansion response associated to each
+// ExternalTransform.
+//
+// Components and Transform fields are purposely typed as interface{} to avoid
+// unnecesary proto related imports into graph.
 type ExpandedTransform struct {
 	Components   interface{} // *pipepb.Components
 	Transform    interface{} //*pipepb.PTransform
 	Requirements []string
 }
 
-// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively
+// ExternalTransform represents the cross-language transform in and out of
+// pipeline graph. It is associated with each MultiEdge and it's Inbound and
+// Outbound links. It also stores the associated expansion response within the
+// Expanded field.
 type ExternalTransform struct {
 	Namespace string
 
@@ -49,12 +73,8 @@ type ExternalTransform struct {
 	Expanded *ExpandedTransform
 }
 
-func init() {
-	NewNamespace = NewNamespaceGenerator(10)
-	SourceInputTag = NewNamespace()
-	SinkOutputTag = NewNamespace()
-}
-
+// WithNamedInputs adds a map (tag -> index of Inbound in MultiEdge.Input)
+// of named inputs corresponsing to ExternalTransform's InputsMap
 func (ext ExternalTransform) WithNamedInputs(inputsMap map[string]int) ExternalTransform {
 	if ext.InputsMap != nil {
 		panic(errors.Errorf("inputs already set as: \n%v", ext.InputsMap))
@@ -63,6 +83,8 @@ func (ext ExternalTransform) WithNamedInputs(inputsMap map[string]int) ExternalT
 	return ext
 }
 
+// WithNamedOutputs adds a map (tag -> index of Outbound in MultiEdge.Output)
+// of named outputs corresponsing to ExternalTransform's OutputsMap
 func (ext ExternalTransform) WithNamedOutputs(outputsMap map[string]int) ExternalTransform {
 	if ext.OutputsMap != nil {
 		panic(errors.Errorf("outputTypes already set as: \n%v", ext.OutputsMap))
@@ -71,8 +93,7 @@ func (ext ExternalTransform) WithNamedOutputs(outputsMap map[string]int) Externa
 	return ext
 }
 
-// TODO(pskevin): Credit one of the best stackoverflow answers @ https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-go
-
+// NewNamespaceGenerator returns a functions that generates a random string of n alphabets
 func NewNamespaceGenerator(n int) func() string {
 	const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
 	const (
diff --git a/sdks/go/pkg/beam/core/graph/xlang_test.go b/sdks/go/pkg/beam/core/graph/xlang_test.go
new file mode 100644
index 0000000..80a8339
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/xlang_test.go
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graph
+
+import (
+	"testing"
+)
+
+func expectPanic(t *testing.T, err string) {
+	if r := recover(); r == nil {
+		t.Errorf("expected panic; %v", err)
+	}
+}
+
+func TestWithInputs(t *testing.T) {
+	inputsMap := map[string]int{"x": 1}
+
+	t.Run("InputsMap initialized", func(t *testing.T) {
+		defer expectPanic(t, "inserting into initialized map should fail")
+		e := ExternalTransform{InputsMap: make(map[string]int)}
+		e.WithNamedInputs(inputsMap)
+	})
+
+	t.Run("InputsMap nil", func(t *testing.T) {
+		e := ExternalTransform{}
+		newE := e.WithNamedInputs(inputsMap)
+		for tag, idx := range inputsMap {
+			if v, exists := newE.InputsMap[tag]; !exists || v != idx {
+				t.Errorf("expected inputs map %v; got inputs map %v", inputsMap, newE.InputsMap)
+			}
+		}
+	})
+}
+
+func TestWithOutputs(t *testing.T) {
+	outputsMap := map[string]int{"x": 1}
+
+	t.Run("OutputsMap initialized", func(t *testing.T) {
+		defer expectPanic(t, "inserting into initialized map should fail")
+		e := ExternalTransform{OutputsMap: make(map[string]int)}
+		e.WithNamedOutputs(outputsMap)
+	})
+
+	t.Run("OutputsMap nil", func(t *testing.T) {
+		e := ExternalTransform{}
+		newE := e.WithNamedOutputs(outputsMap)
+		for tag, idx := range outputsMap {
+			if v, exists := newE.OutputsMap[tag]; !exists || v != idx {
+				t.Errorf("expected outputs map %v; got outputs map %v", outputsMap, newE.OutputsMap)
+			}
+		}
+	})
+}
+
+func TestNewNamespaceGenerator(t *testing.T) {
+	seen := make(map[string]bool)
+	runs := 1000000
+	n := 10
+
+	gen := NewNamespaceGenerator(n)
+
+	for i := 0; i < runs; i++ {
+		k := gen()
+		if len(k) < n {
+			t.Errorf("expected string of length %v; got string of length %v", n, len(k))
+		}
+		seen[gen()] = true
+	}
+
+	if len(seen) < runs {
+		t.Errorf("repeated random strings generated; could cause namespace collision")
+	}
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 0a0bb53..d427e9f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -395,7 +395,10 @@ func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
 
 	for tag, n := range ExternalInputs(edge) {
 		m.addNode(n)
-		// TODO(pskevin): switch based on graph.SourceInputTag
+		// Ignore tag if it is a dummy SourceInputTag
+		if tag == graph.SourceInputTag {
+			tag = fmt.Sprintf("i%v", edge.External.InputsMap[tag])
+		}
 		inputs[tag] = nodeID(n)
 	}
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
index 5ee3012..cc1bb71 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
@@ -21,6 +21,8 @@ import (
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// ExpandedComponents type asserts the Components field with interface{} type
+// and returns its pipeline component proto representation
 func ExpandedComponents(exp *graph.ExpandedTransform) *pipepb.Components {
 	if c, ok := exp.Components.(*pipepb.Components); ok {
 		return c
@@ -28,6 +30,8 @@ func ExpandedComponents(exp *graph.ExpandedTransform) *pipepb.Components {
 	panic(errors.Errorf("malformed components; %v lacks a conforming pipeline component", exp))
 }
 
+// ExpandedTransform type asserts the Transform field with interface{} type
+// and returns its pipeline ptransform proto representation
 func ExpandedTransform(exp *graph.ExpandedTransform) *pipepb.PTransform {
 	if t, ok := exp.Transform.(*pipepb.PTransform); ok {
 		return t
@@ -35,6 +39,9 @@ func ExpandedTransform(exp *graph.ExpandedTransform) *pipepb.PTransform {
 	panic(errors.Errorf("malformed transform; %v lacks a conforming pipeline ptransform", exp))
 }
 
+// ExternalInputs returns the map (tag -> graph node representing the
+// pcollection) of input nodes with respect to the map (tag -> index of Inbound
+// in MultiEdge.Input) of named inputs
 func ExternalInputs(e *graph.MultiEdge) map[string]*graph.Node {
 	inputs := make(map[string]*graph.Node)
 
@@ -44,6 +51,9 @@ func ExternalInputs(e *graph.MultiEdge) map[string]*graph.Node {
 	return inputs
 }
 
+// ExternalOutputs returns the map (tag -> graph node representing the
+// pcollection) of output nodes with respect to the map (tag -> index of
+// Outbound in MultiEdge.Output) of named outputs
 func ExternalOutputs(e *graph.MultiEdge) map[string]*graph.Node {
 	outputs := make(map[string]*graph.Node)
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
new file mode 100644
index 0000000..98e415b
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx
+
+import (
+	"fmt"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/protobuf/testing/protocmp"
+)
+
+func newNode(g *graph.Graph) *graph.Node {
+	n := g.NewNode(typex.New(reflectx.Int), window.DefaultWindowingStrategy(), true)
+	return n
+}
+
+func newIn(g *graph.Graph) *graph.Inbound {
+	return &graph.Inbound{
+		From: newNode(g),
+	}
+}
+
+func newIns(g *graph.Graph, n int) []*graph.Inbound {
+	var ins []*graph.Inbound
+	for i := 0; i < n; i++ {
+		ins = append(ins, newIn(g))
+	}
+	return ins
+}
+
+func newOut(g *graph.Graph) *graph.Outbound {
+	return &graph.Outbound{
+		To: newNode(g),
+	}
+}
+
+func newOuts(g *graph.Graph, n int) []*graph.Outbound {
+	var outs []*graph.Outbound
+	for i := 0; i < n; i++ {
+		outs = append(outs, newOut(g))
+	}
+	return outs
+}
+
+func newEdge(g *graph.Graph, ins, outs int) *graph.MultiEdge {
+	return &graph.MultiEdge{
+		Input:  newIns(g, ins),
+		Output: newOuts(g, outs),
+	}
+}
+
+func newExternal(ins, outs map[string]int) *graph.ExternalTransform {
+	return &graph.ExternalTransform{
+		InputsMap:  ins,
+		OutputsMap: outs,
+	}
+}
+
+type testExternalConf struct {
+	i    int
+	o    int
+	iMap map[string]int
+	oMap map[string]int
+}
+
+func TestExternalInputs(t *testing.T) {
+	tt := testExternalConf{i: 2, o: 3, iMap: map[string]int{"x": 1}, oMap: map[string]int{"y": 1}}
+
+	g := graph.New()
+	e := newEdge(g, tt.i, tt.o)
+	e.External = newExternal(tt.iMap, tt.oMap)
+
+	i := ExternalInputs(e)
+
+	for tag, idx := range tt.iMap {
+		got, exists := i[tag]
+		want := e.Input[idx].From
+
+		if !exists {
+			t.Errorf("input absent for key %v; expected %v", tag, want)
+		}
+
+		if got.ID() != want.ID() {
+			t.Errorf("wrong input associated with key %v; want %v but got %v", tag, want, got)
+		}
+	}
+}
+
+func TestExternalOutputs(t *testing.T) {
+	tt := testExternalConf{i: 2, o: 3, iMap: map[string]int{"x": 1}, oMap: map[string]int{"y": 1}}
+
+	g := graph.New()
+	e := newEdge(g, tt.i, tt.o)
+	e.External = newExternal(tt.iMap, tt.oMap)
+
+	o := ExternalOutputs(e)
+
+	for tag, idx := range tt.oMap {
+		got, exists := o[tag]
+		want := e.Output[idx].To
+
+		if !exists {
+			t.Errorf("output absent for key %v; expected %v", tag, want)
+		}
+
+		if got.ID() != want.ID() {
+			t.Errorf("wrong output associated with key %v; want %v but got %v", tag, want, got)
+		}
+	}
+}
+
+func newTransform(name string) *pipepb.PTransform {
+	return &pipepb.PTransform{
+		UniqueName: name,
+	}
+}
+
+func newComponents(ts []string) *pipepb.Components {
+	components := &pipepb.Components{}
+
+	components.Transforms = make(map[string]*pipepb.PTransform)
+	for id, t := range ts {
+		components.Transforms[fmt.Sprint(id)] = newTransform(t)
+	}
+
+	return components
+}
+
+func expectPanic(t *testing.T, err string) {
+	if r := recover(); r == nil {
+		t.Errorf("expected panic; %v", err)
+	}
+}
+
+func TestExpandedTransform(t *testing.T) {
+	t.Run("Correct PTransform", func(t *testing.T) {
+		want := newTransform("x")
+		exp := &graph.ExpandedTransform{Transform: want}
+
+		got := ExpandedTransform(exp)
+
+		if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
+			t.Errorf("diff (-want, +got): %v", d)
+		}
+
+	})
+
+	t.Run("Malformed PTransform", func(t *testing.T) {
+		defer expectPanic(t, "string can't be type asserted into a pipeline PTransform")
+		exp := &graph.ExpandedTransform{Transform: "gibberish"}
+		ExpandedTransform(exp)
+	})
+}
+
+func TestExpandedComponents(t *testing.T) {
+	t.Run("Correct Components", func(t *testing.T) {
+		want := newComponents([]string{"x"})
+		exp := &graph.ExpandedTransform{Components: want}
+
+		got := ExpandedComponents(exp)
+
+		if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
+			t.Errorf("diff (-want, +got): %v", d)
+		}
+
+	})
+
+	t.Run("Malformed Components", func(t *testing.T) {
+		defer expectPanic(t, "string can't be type asserted into a pipeline Components")
+		exp := &graph.ExpandedTransform{Transform: "gibberish"}
+		ExpandedComponents(exp)
+	})
+}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
index 4887727..d7f435e 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
@@ -23,6 +23,7 @@ import (
 	"google.golang.org/grpc"
 )
 
+// Expand queries the expansion service to resolve the ExpansionRequest
 func Expand(ctx context.Context, req *jobpb.ExpansionRequest, expansionAddr string) (*jobpb.ExpansionResponse, error) {
 	// Querying Expansion Service
 
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/namespace.go b/sdks/go/pkg/beam/core/runtime/xlangx/namespace.go
index d464ccc..b18995d 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/namespace.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/namespace.go
@@ -22,7 +22,7 @@ import (
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
-func AddCoderID(c *pipepb.Components, idMap map[string]string, cid string, newID func(string) string) string {
+func addCoderID(c *pipepb.Components, idMap map[string]string, cid string, newID func(string) string) string {
 	if _, exists := idMap[cid]; exists {
 		return idMap[cid]
 	}
@@ -38,7 +38,7 @@ func AddCoderID(c *pipepb.Components, idMap map[string]string, cid string, newID
 		updatedComponentCoderIDs = append(updatedComponentCoderIDs, coder.ComponentCoderIds...)
 
 		for i, ccid := range coder.ComponentCoderIds {
-			updatedComponentCoderIDs[i] = AddCoderID(c, idMap, ccid, newID)
+			updatedComponentCoderIDs[i] = addCoderID(c, idMap, ccid, newID)
 		}
 		coder.ComponentCoderIds = updatedComponentCoderIDs
 	}
@@ -52,7 +52,7 @@ func AddCoderID(c *pipepb.Components, idMap map[string]string, cid string, newID
 	return idMap[cid]
 }
 
-func AddWindowingStrategyID(c *pipepb.Components, idMap map[string]string, wid string, newID func(string) string) string {
+func addWindowingStrategyID(c *pipepb.Components, idMap map[string]string, wid string, newID func(string) string) string {
 	if _, exists := idMap[wid]; exists {
 		return idMap[wid]
 	}
@@ -64,12 +64,12 @@ func AddWindowingStrategyID(c *pipepb.Components, idMap map[string]string, wid s
 
 	// Updating WindowCoderID of WindowingStrategy
 	if windowingStrategy.WindowCoderId != "" {
-		windowingStrategy.WindowCoderId = AddCoderID(c, idMap, windowingStrategy.WindowCoderId, newID)
+		windowingStrategy.WindowCoderId = addCoderID(c, idMap, windowingStrategy.WindowCoderId, newID)
 	}
 
 	// Updating EnvironmentId of WindowingStrategy
 	if windowingStrategy.EnvironmentId != "" {
-		windowingStrategy.EnvironmentId = AddEnvironmentID(c, idMap, windowingStrategy.EnvironmentId, newID)
+		windowingStrategy.EnvironmentId = addEnvironmentID(c, idMap, windowingStrategy.EnvironmentId, newID)
 	}
 
 	idMap[wid] = newID(wid)
@@ -81,7 +81,7 @@ func AddWindowingStrategyID(c *pipepb.Components, idMap map[string]string, wid s
 	return idMap[wid]
 }
 
-func AddEnvironmentID(c *pipepb.Components, idMap map[string]string, eid string, newID func(string) string) string {
+func addEnvironmentID(c *pipepb.Components, idMap map[string]string, eid string, newID func(string) string) string {
 	if _, exists := idMap[eid]; exists {
 		return idMap[eid]
 	}
@@ -109,16 +109,41 @@ func AddNamespace(t *pipepb.PTransform, c *pipepb.Components, namespace string)
 
 	// Update Environment ID of PTransform
 	if t.EnvironmentId != "" {
-		t.EnvironmentId = AddEnvironmentID(c, idMap, t.EnvironmentId, newID)
+		t.EnvironmentId = addEnvironmentID(c, idMap, t.EnvironmentId, newID)
 	}
 	for _, pcolsMap := range []map[string]string{t.Inputs, t.Outputs} {
 		for _, pid := range pcolsMap {
 			if pcol, exists := c.Pcollections[pid]; exists {
 				// Update Coder ID of PCollection
-				pcol.CoderId = AddCoderID(c, idMap, pcol.CoderId, newID)
+				pcol.CoderId = addCoderID(c, idMap, pcol.CoderId, newID)
 
 				// Update WindowingStrategyID of PCollection
-				pcol.WindowingStrategyId = AddWindowingStrategyID(c, idMap, pcol.WindowingStrategyId, newID)
+				pcol.WindowingStrategyId = addWindowingStrategyID(c, idMap, pcol.WindowingStrategyId, newID)
+			}
+		}
+	}
+
+	// c.Transforms = make(map[string]*pipepb.PTransform)
+	sourceName := t.UniqueName
+	for _, t := range c.Transforms {
+		if t.UniqueName != sourceName {
+			if id, exists := idMap[t.EnvironmentId]; exists {
+				t.EnvironmentId = id
+			}
+			for _, pcolsMap := range []map[string]string{t.Inputs, t.Outputs} {
+				for _, pid := range pcolsMap {
+					if pcol, exists := c.Pcollections[pid]; exists {
+						// Update Coder ID of PCollection
+						if id, exists := idMap[pcol.CoderId]; exists {
+							pcol.CoderId = id
+						}
+
+						// Update WindowingStrategyID of PCollection
+						if id, exists := idMap[pcol.WindowingStrategyId]; exists {
+							pcol.WindowingStrategyId = id
+						}
+					}
+				}
 			}
 		}
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/namespace_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/namespace_test.go
new file mode 100644
index 0000000..af3d56c
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/namespace_test.go
@@ -0,0 +1,579 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"strings"
+	"testing"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/protobuf/testing/protocmp"
+)
+
+func expectPanic(t *testing.T, err string) {
+	if r := recover(); r == nil {
+		t.Errorf("expected panic; %v", err)
+	}
+}
+
+func TestAddNamespace(t *testing.T) {
+	tests := []struct {
+		name        string
+		init        *pipepb.Components
+		namespace   string
+		transformID string
+		want        *pipepb.Components
+		err         string
+	}{
+		{
+			name: "[Correctness] Single Input Multi Output",
+			init: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1", "t0o1": "p2"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p3"},
+						EnvironmentId: "e1",
+					},
+					"t2": &pipepb.PTransform{
+						UniqueName:    "t2",
+						Inputs:        map[string]string{"t2i0": "p2"},
+						Outputs:       map[string]string{"t2o0": "p4"},
+						EnvironmentId: "e0",
+					},
+					"t3": &pipepb.PTransform{
+						UniqueName:    "t3",
+						Inputs:        map[string]string{"t3i0": "p3", "t3i1": "p4"},
+						Outputs:       map[string]string{"t3o0": "p5"},
+						EnvironmentId: "e1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+					"p3": &pipepb.PCollection{CoderId: "c3", WindowingStrategyId: "w1"},
+					"p4": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w0"},
+					"p5": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w1"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0": &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1": &pipepb.WindowingStrategy{WindowCoderId: "c4", EnvironmentId: "e1"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0": &pipepb.Environment{Urn: "e0"},
+					"e1": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+			namespace:   "daASxQwenJ",
+			transformID: "t0",
+			want: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1", "t0o1": "p2"},
+						EnvironmentId: "e0@daASxQwenJ",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p3"},
+						EnvironmentId: "e1@daASxQwenJ",
+					},
+					"t2": &pipepb.PTransform{
+						UniqueName:    "t2",
+						Inputs:        map[string]string{"t2i0": "p2"},
+						Outputs:       map[string]string{"t2o0": "p4"},
+						EnvironmentId: "e0@daASxQwenJ",
+					},
+					"t3": &pipepb.PTransform{
+						UniqueName:    "t3",
+						Inputs:        map[string]string{"t3i0": "p3", "t3i1": "p4"},
+						Outputs:       map[string]string{"t3o0": "p5"},
+						EnvironmentId: "e1@daASxQwenJ",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p1": &pipepb.PCollection{CoderId: "c1@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p2": &pipepb.PCollection{CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p3": &pipepb.PCollection{CoderId: "c3", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p4": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w0"},
+					"p5": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w1@daASxQwenJ"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0":            &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1@daASxQwenJ": &pipepb.WindowingStrategy{WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1@daASxQwenJ"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2":            &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3":            &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0@daASxQwenJ": &pipepb.Environment{Urn: "e0"},
+					"e1@daASxQwenJ": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+		},
+		{
+			name: "[Correctness] Single Input Single Output",
+			init: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1", "t0o1": "p2"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p3"},
+						EnvironmentId: "e1",
+					},
+					"t2": &pipepb.PTransform{
+						UniqueName:    "t2",
+						Inputs:        map[string]string{"t2i0": "p2"},
+						Outputs:       map[string]string{"t2o0": "p4"},
+						EnvironmentId: "e0",
+					},
+					"t3": &pipepb.PTransform{
+						UniqueName:    "t3",
+						Inputs:        map[string]string{"t3i0": "p3", "t3i1": "p4"},
+						Outputs:       map[string]string{"t3o0": "p5"},
+						EnvironmentId: "e1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+					"p3": &pipepb.PCollection{CoderId: "c3", WindowingStrategyId: "w1"},
+					"p4": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w0"},
+					"p5": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w1"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0": &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1": &pipepb.WindowingStrategy{WindowCoderId: "c4", EnvironmentId: "e1"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0": &pipepb.Environment{Urn: "e0"},
+					"e1": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+			namespace:   "daASxQwenJ",
+			transformID: "t1",
+			want: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1", "t0o1": "p2"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p3"},
+						EnvironmentId: "e1@daASxQwenJ",
+					},
+					"t2": &pipepb.PTransform{
+						UniqueName:    "t2",
+						Inputs:        map[string]string{"t2i0": "p2"},
+						Outputs:       map[string]string{"t2o0": "p4"},
+						EnvironmentId: "e0",
+					},
+					"t3": &pipepb.PTransform{
+						UniqueName:    "t3",
+						Inputs:        map[string]string{"t3i0": "p3", "t3i1": "p4"},
+						Outputs:       map[string]string{"t3o0": "p5"},
+						EnvironmentId: "e1@daASxQwenJ",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p1": &pipepb.PCollection{CoderId: "c1@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p3": &pipepb.PCollection{CoderId: "c3@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p4": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w0"},
+					"p5": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w1@daASxQwenJ"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0":            &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1@daASxQwenJ": &pipepb.WindowingStrategy{WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1@daASxQwenJ"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0":            &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2":            &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0":            &pipepb.Environment{Urn: "e0"},
+					"e1@daASxQwenJ": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+		},
+		{
+			name: "[Correctness] Multi Input Single Output",
+			init: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1", "t0o1": "p2"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p3"},
+						EnvironmentId: "e1",
+					},
+					"t2": &pipepb.PTransform{
+						UniqueName:    "t2",
+						Inputs:        map[string]string{"t2i0": "p2"},
+						Outputs:       map[string]string{"t2o0": "p4"},
+						EnvironmentId: "e0",
+					},
+					"t3": &pipepb.PTransform{
+						UniqueName:    "t3",
+						Inputs:        map[string]string{"t3i0": "p3", "t3i1": "p4"},
+						Outputs:       map[string]string{"t3o0": "p5"},
+						EnvironmentId: "e1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+					"p3": &pipepb.PCollection{CoderId: "c3", WindowingStrategyId: "w1"},
+					"p4": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w0"},
+					"p5": &pipepb.PCollection{CoderId: "c2", WindowingStrategyId: "w1"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0": &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1": &pipepb.WindowingStrategy{WindowCoderId: "c4", EnvironmentId: "e1"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0": &pipepb.Environment{Urn: "e0"},
+					"e1": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+			namespace:   "daASxQwenJ",
+			transformID: "t3",
+			want: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1", "t0o1": "p2"},
+						EnvironmentId: "e0@daASxQwenJ",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p3"},
+						EnvironmentId: "e1@daASxQwenJ",
+					},
+					"t2": &pipepb.PTransform{
+						UniqueName:    "t2",
+						Inputs:        map[string]string{"t2i0": "p2"},
+						Outputs:       map[string]string{"t2o0": "p4"},
+						EnvironmentId: "e0@daASxQwenJ",
+					},
+					"t3": &pipepb.PTransform{
+						UniqueName:    "t3",
+						Inputs:        map[string]string{"t3i0": "p3", "t3i1": "p4"},
+						Outputs:       map[string]string{"t3o0": "p5"},
+						EnvironmentId: "e1@daASxQwenJ",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p3": &pipepb.PCollection{CoderId: "c3@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p4": &pipepb.PCollection{CoderId: "c2@daASxQwenJ", WindowingStrategyId: "w0@daASxQwenJ"},
+					"p5": &pipepb.PCollection{CoderId: "c2@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0@daASxQwenJ": &pipepb.WindowingStrategy{WindowCoderId: "c3@daASxQwenJ", EnvironmentId: "e0@daASxQwenJ"},
+					"w1@daASxQwenJ": &pipepb.WindowingStrategy{WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1@daASxQwenJ"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0":            &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1":            &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0@daASxQwenJ": &pipepb.Environment{Urn: "e0"},
+					"e1@daASxQwenJ": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+		},
+		{
+			name: "[Correctness] Component Coders",
+			init: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p2"},
+						EnvironmentId: "e1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w0"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0": &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1": &pipepb.WindowingStrategy{WindowCoderId: "c4", EnvironmentId: "e1"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}, ComponentCoderIds: []string{"c2"}},
+					"c1": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0": &pipepb.Environment{Urn: "e0"},
+					"e1": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+			namespace:   "daASxQwenJ",
+			transformID: "t0",
+			want: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1"},
+						EnvironmentId: "e0@daASxQwenJ",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p2"},
+						EnvironmentId: "e1@daASxQwenJ",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w0@daASxQwenJ"},
+					"p1": &pipepb.PCollection{CoderId: "c1@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+					"p2": &pipepb.PCollection{CoderId: "c0@daASxQwenJ", WindowingStrategyId: "w1@daASxQwenJ"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0@daASxQwenJ": &pipepb.WindowingStrategy{WindowCoderId: "c3@daASxQwenJ", EnvironmentId: "e0@daASxQwenJ"},
+					"w1@daASxQwenJ": &pipepb.WindowingStrategy{WindowCoderId: "c4@daASxQwenJ", EnvironmentId: "e1@daASxQwenJ"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}, ComponentCoderIds: []string{"c2@daASxQwenJ"}},
+					"c1@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+					"c2@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c2"}},
+					"c3@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c3"}},
+					"c4@daASxQwenJ": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c4"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0@daASxQwenJ": &pipepb.Environment{Urn: "e0"},
+					"e1@daASxQwenJ": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+		},
+		{
+			name: "[Consistency] Missing EnvironmentID",
+			init: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p2"},
+						EnvironmentId: "e1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w0"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0": &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1": &pipepb.WindowingStrategy{WindowCoderId: "c4", EnvironmentId: "e1"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					// Missing "e0"
+					"e1": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+			namespace:   "daASxQwenJ",
+			transformID: "t0",
+			err:         "trying to add an Environment whose key is absent should panic",
+		},
+		{
+			name: "[Consistency] Missing WindowingStrategyID",
+			init: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p2"},
+						EnvironmentId: "e1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w0"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					// Missing w0
+					"w1": &pipepb.WindowingStrategy{WindowCoderId: "c4", EnvironmentId: "e1"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"c0": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c0"}},
+					"c1": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0": &pipepb.Environment{Urn: "e0"},
+					"e1": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+			namespace:   "daASxQwenJ",
+			transformID: "t0",
+			err:         "trying to add an WindowingStrategy whose key is absent should panic",
+		},
+		{
+			name: "[Consistency] Missing CoderID",
+			init: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"t0": &pipepb.PTransform{
+						UniqueName:    "t0",
+						Inputs:        map[string]string{"t0i0": "p0"},
+						Outputs:       map[string]string{"t0o0": "p1"},
+						EnvironmentId: "e0",
+					},
+					"t1": &pipepb.PTransform{
+						UniqueName:    "t1",
+						Inputs:        map[string]string{"t1i0": "p1"},
+						Outputs:       map[string]string{"t1o0": "p2"},
+						EnvironmentId: "e1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"p0": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w0"},
+					"p1": &pipepb.PCollection{CoderId: "c1", WindowingStrategyId: "w1"},
+					"p2": &pipepb.PCollection{CoderId: "c0", WindowingStrategyId: "w1"},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"w0": &pipepb.WindowingStrategy{WindowCoderId: "c3", EnvironmentId: "e0"},
+					"w1": &pipepb.WindowingStrategy{WindowCoderId: "c4", EnvironmentId: "e1"},
+				},
+				Coders: map[string]*pipepb.Coder{
+					// Missing c0
+					"c1": &pipepb.Coder{Spec: &pipepb.FunctionSpec{Urn: "c1"}},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"e0": &pipepb.Environment{Urn: "e0"},
+					"e1": &pipepb.Environment{Urn: "e1"},
+				},
+			},
+			namespace:   "daASxQwenJ",
+			transformID: "t0",
+			err:         "trying to add an WindowingStrategy whose key is absent should panic",
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if strings.Contains(tt.name, "Correctness") {
+				transform := tt.init.Transforms[tt.transformID]
+
+				AddNamespace(transform, tt.init, tt.namespace)
+
+				if d := cmp.Diff(tt.want, tt.init, protocmp.Transform()); d != "" {
+					t.Errorf("diff (-want, +got): %v", d)
+				}
+			}
+
+			if strings.Contains(tt.name, "Consistency") {
+				defer expectPanic(t, tt.err)
+				transform := tt.init.Transforms[tt.transformID]
+				AddNamespace(transform, tt.init, tt.namespace)
+			}
+
+		})
+	}
+}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
index 8d09e7f..5a456dc 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
@@ -24,8 +24,7 @@ import (
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
-// TODO(BEAM-9919): add documentation to all helper methods
-
+// MergeExpandedWithPipeline adds expanded components of all ExternalTransforms to the exisiting pipeline
 func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 	// Adding Expanded transforms to their counterparts in the Pipeline
 
@@ -37,21 +36,25 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 			p.Requirements = append(p.Requirements, exp.Requirements...)
 
 			// Adding components of the Expanded Transforms to the current Pipeline
-			for k, v := range graphx.ExpandedComponents(exp).GetTransforms() {
+			components := graphx.ExpandedComponents(exp)
+			for k, v := range components.GetTransforms() {
 				p.Components.Transforms[k] = v
 			}
-			for k, v := range graphx.ExpandedComponents(exp).GetPcollections() {
+			for k, v := range components.GetPcollections() {
 				p.Components.Pcollections[k] = v
 			}
-			for k, v := range graphx.ExpandedComponents(exp).GetWindowingStrategies() {
+			for k, v := range components.GetWindowingStrategies() {
 				p.Components.WindowingStrategies[k] = v
 			}
-			for k, v := range graphx.ExpandedComponents(exp).GetCoders() {
+			for k, v := range components.GetCoders() {
 				p.Components.Coders[k] = v
 			}
-			for k, v := range graphx.ExpandedComponents(exp).GetEnvironments() {
-				// TODO(pskevin): Resolve temporary hack to enable LOOPBACK mode
+			for k, v := range components.GetEnvironments() {
 				if k == "go" {
+					// This case is not an anomaly. It is expected to be always
+					// present. Any initial ExpansionRequest will have a
+					// component which requires the "go" environment. Scoping
+					// using unique namespace prevents collision.
 					continue
 				}
 				p.Components.Environments[k] = v
@@ -63,22 +66,36 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 	}
 }
 
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
 func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
-
 	idxMap := make(map[string]string)
 	components := p.GetComponents()
 
+	// Generating map (oldID -> newID) of outputs to be purged
 	for _, e := range edges {
 		if e.Op == graph.External {
 			for tag, n := range graphx.ExternalOutputs(e) {
 				nodeID := fmt.Sprintf("n%v", n.ID())
-				pcolID := graphx.ExpandedTransform(e.External.Expanded).GetOutputs()[tag]
+
+				expandedOutputs := graphx.ExpandedTransform(e.External.Expanded).GetOutputs()
+				var pcolID string
+				if tag == graph.SinkOutputTag {
+					for _, pcolID = range expandedOutputs {
+						// easiest way to access map with one entry (key,value)
+					}
+				} else {
+					pcolID = expandedOutputs[tag]
+				}
+
 				idxMap[nodeID] = pcolID
-				components.GetPcollections()[nodeID] = nil // Will get purged while using pipelinex.Update
+				delete(components.Pcollections, nodeID)
 			}
 		}
 	}
 
+	// Updating all input ids to reflect the correct sources
 	for _, t := range components.GetTransforms() {
 		inputs := t.GetInputs()
 		for tag, nodeID := range inputs {
@@ -87,23 +104,30 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 			}
 		}
 	}
+
 }
 
-// TODO(pskevin): handle sourceInput and sinkOutput
+// VerifyNamedOutputs ensures the expanded outputs correspond to the correct and expected named outputs
 func VerifyNamedOutputs(ext *graph.ExternalTransform) {
 	expandedOutputs := graphx.ExpandedTransform(ext.Expanded).GetOutputs()
 
 	if len(expandedOutputs) != len(ext.OutputsMap) {
-		panic(errors.Errorf("mismatched number of outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), len(ext.OutputsMap)))
+		panic(errors.Errorf("mismatched number of named outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), len(ext.OutputsMap)))
 	}
 
 	for tag := range ext.OutputsMap {
-		if _, exists := expandedOutputs[tag]; tag != "sinkOutput" && !exists {
+		_, exists := expandedOutputs[tag]
+		if tag != graph.SinkOutputTag && !exists {
 			panic(errors.Errorf("missing named output in expanded transform: %v is expected in %v", tag, expandedOutputs))
 		}
+		if tag == graph.SinkOutputTag && len(expandedOutputs) > 1 {
+			panic(errors.Errorf("mismatched number of unnamed outputs:\nreceived - %v\nexpected - 1", len(expandedOutputs)))
+		}
 	}
 }
 
+// ResolveOutputIsBounded updates each Output node with respect to the received
+// expanded components to reflect if it is bounded or not
 func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Node, bool)) {
 	ext := e.External
 	exp := ext.Expanded
@@ -115,7 +139,7 @@ func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Nod
 		isBounded := true
 
 		switch tag {
-		case "sinkOutput":
+		case graph.SinkOutputTag:
 			for _, id = range expandedOutputs {
 				// easiest way to access map with one entry (key,value)
 			}
@@ -135,6 +159,8 @@ func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Nod
 	}
 }
 
+// AddFakeImpulses adds an impulse transform as the producer for each input to
+// the root transform. Inputs need producers to form a correct pipeline.
 func AddFakeImpulses(p *pipepb.Pipeline) {
 	// For a pipeline consisting of only the external node edge, there will be
 	// single root transform which will be the external transform.
@@ -159,11 +185,13 @@ func AddFakeImpulses(p *pipepb.Pipeline) {
 
 }
 
+// RemoveFakeImpulses removes each fake impulse per input to the the transform.
+// Multiple producers for one Input cannot be present.
 func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform) {
 	transforms := c.GetTransforms()
 	var impulseIDs []string
 
-	for tag, _ := range ext.GetInputs() {
+	for tag := range ext.GetInputs() {
 		id := fmt.Sprintf("%s_%s", "impulse", tag)
 		impulseIDs = append(impulseIDs, id)
 	}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index d8e54dd..940f852 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -83,10 +83,13 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		return errors.WithContextf(err, "generating model pipeline")
 	}
 
+	// Fetch all dependencies for cross-language transforms
 	xlangx.ResolveArtifacts(ctx, edges, pipeline)
 
+	// Remap outputs of expanded transforms to be the inputs for all downstream consumers
 	xlangx.PurgeOutputInput(edges, pipeline)
 
+	// Merge the expanded components into the existing pipeline
 	xlangx.MergeExpandedWithPipeline(edges, pipeline)
 
 	log.Info(ctx, proto.MarshalTextString(pipeline))
diff --git a/sdks/go/pkg/beam/xlang.go b/sdks/go/pkg/beam/xlang.go
index 87d5356..578e227 100644
--- a/sdks/go/pkg/beam/xlang.go
+++ b/sdks/go/pkg/beam/xlang.go
@@ -1,17 +1,17 @@
 // Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
+// contributor license agreements.  See the NOTICE file distributed with this
+// work for additional information regarding copyright ownership. The ASF
+// licenses this file to You under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
 //
 //    http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
 
 package beam
 
@@ -21,20 +21,32 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/xlangx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 )
 
-// This is an experimetnal API and subject to change
-func CrossLanguage(s Scope, urn string, payload []byte, expansionAddr string, inputs map[string]PCollection, outputTypes map[string]FullType) map[string]PCollection {
+// xlang exposes an API to execute cross-language transforms within the Go SDK.
+// It is experimental and likely to change. It exposes convenient wrappers
+// around the core functions to pass in any combination of named/unnamed
+// inputs/outputs.
+
+// CrossLanguage executes a cross-language transform that uses named inputs and
+// returns named outputs.
+func CrossLanguage(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	namedInputs map[string]PCollection,
+	namedOutputTypes map[string]FullType,
+) map[string]PCollection {
 	if !s.IsValid() {
 		panic(errors.New("invalid scope"))
 	}
 
-	namedInputNodes := mapPCollectionToNode(inputs)
-
-	inputsMap, inboundLinks := graph.NewNamedInboundLinks(namedInputNodes)
-	outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real, outputTypes)
+	inputsMap, inboundLinks := graph.NamedInboundLinks(mapPCollectionToNode(namedInputs))
+	outputsMap, outboundLinks := graph.NamedOutboundLinks(s.real, namedOutputTypes)
 
 	ext := graph.ExternalTransform{
 		Urn:           urn,
@@ -42,70 +54,121 @@ func CrossLanguage(s Scope, urn string, payload []byte, expansionAddr string, in
 		ExpansionAddr: expansionAddr,
 	}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
 
-	outputNodes, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
+	namedOutputs, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
 	if err != nil {
 		panic(errors.WithContextf(err, "tried cross-language and failed"))
 	}
-	return mapNodeToPCollection(outputNodes)
+	return mapNodeToPCollection(namedOutputs)
 }
 
-/*
-func CrossLanguageWithSink(s Scope, urn string, payload []byte, expansionAddr string, inputs map[string]PCollection, outputType FullType) PCollection {
-	inputNodes := mapPCollectionToNode(inputs)
+// CrossLanguageWithSingleInputOutput executes a cross-language transform that
+// uses a single unnamed input and returns a single unnamed output.
+func CrossLanguageWithSingleInputOutput(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	input PCollection,
+	outputType FullType,
+) PCollection {
+	if !s.IsValid() {
+		panic(errors.New("invalid scope"))
+	}
+
+	// Adding dummy SourceInputTag to process it as a named input
+	namedInput := mapPCollectionToNode(map[string]PCollection{graph.SourceInputTag: input})
+	// Adding dummy SinkOutputTag to process it as a named output
+	namedOutputType := map[string]typex.FullType{graph.SinkOutputTag: outputType}
 
-	inputsMap, inboundLinks := graph.NewNamedInboundLinks(inputNodes)
-	outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real, map[string]typex.FullType{graph.SinkOutputTag: outputType})
+	inputsMap, inboundLinks := graph.NamedInboundLinks(namedInput)
+	outputsMap, outboundLinks := graph.NamedOutboundLinks(s.real, namedOutputType)
 
 	ext := graph.ExternalTransform{
 		Urn:           urn,
 		Payload:       payload,
 		ExpansionAddr: expansionAddr,
-	}.WithNamedInputs(inputNodes).WithNamedOutputs(outputTypes)
+	}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
 
-	outputNodes, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
+	namedOutput, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
 	if err != nil {
 		panic(errors.WithContextf(err, "tried cross-language and failed"))
 	}
-	namedOutputNode := mapNodeToPCollection(outputNodes)
+	return nodeToPCollection(namedOutput[graph.SinkOutputTag])
+}
 
-	outputNode, exists := namedOutputNode[graph.SinkOutputTag]
-	if !exists {
-		panic("a")
+// CrossLanguageWithSink executes a cross-language transform that uses named
+// inputs and returns a single unnamed output.
+func CrossLanguageWithSink(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	namedInputs map[string]PCollection,
+	outputType FullType,
+) PCollection {
+	if !s.IsValid() {
+		panic(errors.New("invalid scope"))
 	}
-	return outputNode
-}
 
-func CrossLanguageWithSource(s Scope, urn string, payload []byte, expansionAddr string, input PCollection, outputs map[string]FullType) map[string]PCollection {
-	return MustN(
-		TryCrossLanguage(
-			&ExternalTransform{
-				Urn:           urn,
-				Payload:       payload,
-				ExpansionAddr: expansionAddr,
-			}.
-				withSource(input).
-				withNamedOutputs(outputs)))
+	// Adding dummy SinkOutputTag to process it as a named output
+	namedOutputType := map[string]typex.FullType{graph.SinkOutputTag: outputType}
+
+	inputsMap, inboundLinks := graph.NamedInboundLinks(mapPCollectionToNode(namedInputs))
+	outputsMap, outboundLinks := graph.NamedOutboundLinks(s.real, namedOutputType)
+
+	ext := graph.ExternalTransform{
+		Urn:           urn,
+		Payload:       payload,
+		ExpansionAddr: expansionAddr,
+	}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
+
+	namedOutput, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
+	if err != nil {
+		panic(errors.WithContextf(err, "tried cross-language and failed"))
+	}
+	return nodeToPCollection(namedOutput[graph.SinkOutputTag])
 }
 
-func CrossLanguageWithSingleInputOutput(s Scope, urn string, payload []byte, expansionAddr string, input PCollection, output FullType) PCollection {
-	return MustN(TryCrossLanguage(
-		&ExternalTransform{
-			Urn:           urn,
-			Payload:       payload,
-			ExpansionAddr: expansionAddr,
-		}.
-			withSource(input).
-			withSink(output)))
+// CrossLanguageWithSource executes a cross-language transform that uses a
+// single unnamed input and returns named outputs
+func CrossLanguageWithSource(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	input PCollection,
+	namedOutputTypes map[string]FullType,
+) map[string]PCollection {
+	if !s.IsValid() {
+		panic(errors.New("invalid scope"))
+	}
+
+	// Adding dummy SourceInputTag to process it as a named input
+	namedInput := mapPCollectionToNode(map[string]PCollection{graph.SourceInputTag: input})
+
+	inputsMap, inboundLinks := graph.NamedInboundLinks(namedInput)
+	outputsMap, outboundLinks := graph.NamedOutboundLinks(s.real, namedOutputTypes)
+
+	ext := graph.ExternalTransform{
+		Urn:           urn,
+		Payload:       payload,
+		ExpansionAddr: expansionAddr,
+	}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
+
+	namedOutputs, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
+	if err != nil {
+		panic(errors.WithContextf(err, "tried cross-language and failed"))
+	}
+	return mapNodeToPCollection(namedOutputs)
 }
-*/
 
+// TryCrossLanguage coordinates the core functions required to execute the cross-language transform
 func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inbound, outs []*graph.Outbound) (map[string]*graph.Node, error) {
-	// Add ExternalTransform to the Graph
-
-	// Using existing MultiEdge format to represent ExternalTransform (already backwards compatible)
+	// Adding an edge in the graph corresponding to the ExternalTransform
 	edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, ext, ins, outs)
 
-	// Once the edge with the appropriate input and output nodes are added, a unique namespace can be requested.
+	// Once the appropriate input and output nodes are added to the edge, a
+	// unique namespace can be requested.
 	ext.Namespace = graph.NewNamespace()
 
 	// Build the ExpansionRequest
@@ -121,10 +184,11 @@ func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inboun
 	rootTransformID := p.GetRootTransformIds()[0] // External transform is the only root transform
 	rootTransform := transforms[rootTransformID]
 
+	// Scoping the ExternalTransform with respect to it's unique namespace, thus
+	// avoiding future collisions
 	xlangx.AddNamespace(rootTransform, p.GetComponents(), ext.Namespace)
 
-	xlangx.AddFakeImpulses(p)
-
+	xlangx.AddFakeImpulses(p) // Inputs need to have sources
 	delete(transforms, rootTransformID)
 
 	req := &jobpb.ExpansionRequest{
@@ -133,11 +197,16 @@ func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inboun
 		Namespace:  ext.Namespace,
 	}
 
+	// Querying the expansion service
 	res, err := xlangx.Expand(context.Background(), req, ext.ExpansionAddr)
 	if err != nil {
 		return nil, errors.WithContextf(err, "failed to expand external transform with error [%v] for ExpansionRequest: %v", res.GetError(), req)
 	}
 
+	// Handling ExpansionResponse
+
+	// Previously added fake impulses need to be removed to avoid having
+	// multiple sources to the same pcollection in the graph
 	xlangx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform())
 
 	exp := &graph.ExpandedTransform{
@@ -147,13 +216,16 @@ func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inboun
 	}
 	ext.Expanded = exp
 
+	// Ensures the expected named outputs are present
 	xlangx.VerifyNamedOutputs(ext)
-
+	// Using the expanded outputs, the graph's counterpart outputs are updated with bounded values
 	xlangx.ResolveOutputIsBounded(edge, isBoundedUpdater)
 
 	return graphx.ExternalOutputs(edge), nil
 }
 
+// Wrapper functions to handle beam <-> graph boundaries
+
 func pCollectionToNode(p PCollection) *graph.Node {
 	if !p.IsValid() {
 		panic("tried converting invalid PCollection")