You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/28 21:40:12 UTC

[beam] branch beam23889 created (now 47e4ad0695a)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch beam23889
in repository https://gitbox.apache.org/repos/asf/beam.git


      at 47e4ad0695a Add CHANGES.md line for fix.

This branch includes the following new commits:

     new e45e366e33a Ensure iterated and emitted types are registered.
     new 47e4ad0695a Add CHANGES.md line for fix.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 02/02: Add CHANGES.md line for fix.

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch beam23889
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 47e4ad0695a50bdaa1dc7cce15c95bf2be6e3f9b
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Fri Oct 28 14:39:47 2022 -0700

    Add CHANGES.md line for fix.
---
 CHANGES.md | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index 5a7dad3e0a9..623afdcce5c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,11 @@
 * `ParquetIO.withSplit` was removed since splittable reading has been the default behavior since 2.35.0. The effect of
   this change is to drop support for non-splittable reading ([#23832](https://github.com/apache/beam/issues/23832)).
 
+## Bugfixes
+
+*  [Go] Ensure iterated and emitted types are used with the generic register package are registered with the type and schema
+  registries. ([#23889](https://github.com/apache/beam/pull/23889))
+
 # [2.43.0] - Unreleased
 
 ## Highlights


[beam] 01/02: Ensure iterated and emitted types are registered.

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch beam23889
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e45e366e33a28bd9d7ebc7b94a1dda18e88192c5
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Fri Oct 28 14:34:11 2022 -0700

    Ensure iterated and emitted types are registered.
---
 .../pkg/beam/core/runtime/graphx/schema/schema.go  |   1 +
 sdks/go/pkg/beam/register/emitter.go               |  23 ++-
 sdks/go/pkg/beam/register/emitter_test.go          | 154 +++++++++++++++
 sdks/go/pkg/beam/register/iter.go                  |  19 +-
 sdks/go/pkg/beam/register/iter_test.go             | 192 ++++++++++++++++++
 sdks/go/pkg/beam/register/register_test.go         | 216 ---------------------
 6 files changed, 380 insertions(+), 225 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 7ca516b5694..b8c69007fbd 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -92,6 +92,7 @@ func getUUID(ut reflect.Type) string {
 // Registered returns whether the given type has been registered with
 // the schema package.
 func (r *Registry) Registered(ut reflect.Type) bool {
+	r.reconcileRegistrations()
 	_, ok := r.syntheticToUser[ut]
 	return ok
 }
diff --git a/sdks/go/pkg/beam/register/emitter.go b/sdks/go/pkg/beam/register/emitter.go
index 6c88d28d9fc..3b9cb9910d2 100644
--- a/sdks/go/pkg/beam/register/emitter.go
+++ b/sdks/go/pkg/beam/register/emitter.go
@@ -129,7 +129,9 @@ func Emitter1[T1 any]() {
 	registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
 		return &emit1[T1]{n: n}
 	}
-	exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+	eT := reflect.TypeOf(e).Elem()
+	registerType(eT.In(0))
+	exec.RegisterEmitter(eT, registerFunc)
 }
 
 // Emitter2 registers parameters from your DoFn with a
@@ -147,18 +149,25 @@ func Emitter2[T1, T2 any]() {
 			return &emit1WithTimestamp[T2]{n: n}
 		}
 	}
-	exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+	eT := reflect.TypeOf(e).Elem()
+	registerType(eT.In(0))
+	registerType(eT.In(1))
+	exec.RegisterEmitter(eT, registerFunc)
 }
 
 // Emitter3 registers parameters from your DoFn with a
-// signature func(T1, T2, T3) and optimizes their execution.
+// signature func(beam.EventTime, T2, T3) and optimizes their execution.
 // This must be done by passing in type parameters of all inputs as constraints,
 // aka: register.Emitter3[beam.EventTime, T1, T2](), where T1 is the type of
 // your key and T2 is the type of your value.
-func Emitter3[T1 typex.EventTime, T2, T3 any]() {
-	e := (*func(T1, T2, T3))(nil)
+func Emitter3[ET typex.EventTime, T1, T2 any]() {
+	e := (*func(ET, T1, T2))(nil)
 	registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
-		return &emit2WithTimestamp[T2, T3]{n: n}
+		return &emit2WithTimestamp[T1, T2]{n: n}
 	}
-	exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+	eT := reflect.TypeOf(e).Elem()
+	// No need to register event time.
+	registerType(eT.In(1))
+	registerType(eT.In(2))
+	exec.RegisterEmitter(eT, registerFunc)
 }
diff --git a/sdks/go/pkg/beam/register/emitter_test.go b/sdks/go/pkg/beam/register/emitter_test.go
new file mode 100644
index 00000000000..1855f830c3f
--- /dev/null
+++ b/sdks/go/pkg/beam/register/emitter_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package register
+
+import (
+	"context"
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type myTestTypeEmitter1 struct {
+	Int int
+}
+
+func TestEmitter1(t *testing.T) {
+	Emitter1[int]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) = false, want true")
+	}
+
+	Emitter1[myTestTypeEmitter1]()
+	rt := reflect.TypeOf((*myTestTypeEmitter1)(nil)).Elem()
+	checkRegisterations(t, rt)
+}
+
+type myTestTypeEmitter2A struct {
+	Int int
+}
+
+type myTestTypeEmitter2B struct {
+	String string
+}
+
+func TestEmitter2(t *testing.T) {
+	Emitter2[int, string]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) = false, want true")
+	}
+
+	Emitter2[myTestTypeEmitter2A, myTestTypeEmitter2B]()
+	tA := reflect.TypeOf((*myTestTypeEmitter2A)(nil)).Elem()
+	checkRegisterations(t, tA)
+	tB := reflect.TypeOf((*myTestTypeEmitter2B)(nil)).Elem()
+	checkRegisterations(t, tB)
+}
+
+func TestEmitter2_WithTimestamp(t *testing.T) {
+	Emitter2[typex.EventTime, string]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) = false, want true")
+	}
+}
+
+type myTestTypeEmitter3A struct {
+	Int int
+}
+
+type myTestTypeEmitter3B struct {
+	String string
+}
+
+func TestEmitter3(t *testing.T) {
+	Emitter3[typex.EventTime, int, string]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) = false, want true")
+	}
+
+	Emitter3[typex.EventTime, myTestTypeEmitter3A, myTestTypeEmitter3B]()
+	tA := reflect.TypeOf((*myTestTypeEmitter3A)(nil)).Elem()
+	checkRegisterations(t, tA)
+	tB := reflect.TypeOf((*myTestTypeEmitter3B)(nil)).Elem()
+	checkRegisterations(t, tB)
+}
+
+func TestEmit1(t *testing.T) {
+	e := &emit1[int]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(int))
+	fn(3)
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
+
+func TestEmit2(t *testing.T) {
+	e := &emit2[int, string]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(int, string))
+	fn(3, "hello")
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
+
+func TestEmit1WithTimestamp(t *testing.T) {
+	e := &emit1WithTimestamp[int]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(typex.EventTime, int))
+	fn(mtime.MaxTimestamp, 3)
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
+
+func TestEmit2WithTimestamp(t *testing.T) {
+	e := &emit2WithTimestamp[int, string]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(typex.EventTime, int, string))
+	fn(mtime.MaxTimestamp, 3, "hello")
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
diff --git a/sdks/go/pkg/beam/register/iter.go b/sdks/go/pkg/beam/register/iter.go
index 71d3f3df723..16bce66c143 100644
--- a/sdks/go/pkg/beam/register/iter.go
+++ b/sdks/go/pkg/beam/register/iter.go
@@ -20,7 +20,9 @@ import (
 	"io"
 	"reflect"
 
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
 )
 
 type iter1[T any] struct {
@@ -104,6 +106,14 @@ func (v *iter2[T1, T2]) invoke(key *T1, value *T2) bool {
 	return true
 }
 
+func registerType(t reflect.Type) {
+	if _, ok := runtime.TypeKey(t); !ok {
+		return
+	}
+	runtime.RegisterType(t)
+	schema.RegisterType(t)
+}
+
 // Iter1 registers parameters from your DoFn with a
 // signature func(*T) bool and optimizes their execution.
 // This must be done by passing in type parameters of all inputs as constraints,
@@ -113,7 +123,9 @@ func Iter1[T any]() {
 	registerFunc := func(s exec.ReStream) exec.ReusableInput {
 		return &iter1[T]{s: s}
 	}
-	exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc)
+	itT := reflect.TypeOf(i).Elem()
+	registerType(itT.In(0).Elem())
+	exec.RegisterInput(itT, registerFunc)
 }
 
 // Iter1 registers parameters from your DoFn with a
@@ -125,5 +137,8 @@ func Iter2[T1, T2 any]() {
 	registerFunc := func(s exec.ReStream) exec.ReusableInput {
 		return &iter2[T1, T2]{s: s}
 	}
-	exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc)
+	itT := reflect.TypeOf(i).Elem()
+	registerType(itT.In(0).Elem())
+	registerType(itT.In(1).Elem())
+	exec.RegisterInput(itT, registerFunc)
 }
diff --git a/sdks/go/pkg/beam/register/iter_test.go b/sdks/go/pkg/beam/register/iter_test.go
new file mode 100644
index 00000000000..7cbbf388d7e
--- /dev/null
+++ b/sdks/go/pkg/beam/register/iter_test.go
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package register
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+)
+
+type myTestTypeIter1 struct {
+	Int int
+}
+
+func checkRegisterations(t *testing.T, rt reflect.Type) {
+	t.Helper()
+	key, ok := runtime.TypeKey(rt)
+	if !ok {
+		t.Fatalf("runtime.TypeKey(%v): no typekey for type", rt)
+	}
+	if _, ok := runtime.LookupType(key); !ok {
+		t.Errorf("want type %v to be available with key %q", rt, key)
+	}
+	if !schema.Registered(rt) {
+		t.Errorf("want type %v to be registered with schemas", rt)
+	}
+}
+
+func TestIter1(t *testing.T) {
+	Iter1[int]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+	}
+
+	Iter1[myTestTypeIter1]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+	}
+
+	ttrt := reflect.TypeOf((*myTestTypeIter1)(nil)).Elem()
+	checkRegisterations(t, ttrt)
+}
+
+type myTestTypeIter2A struct {
+	Int int
+}
+
+type myTestTypeIter2B struct {
+	Int int
+}
+
+func TestIter2(t *testing.T) {
+	Iter2[int, string]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) = false, want true")
+	}
+
+	Iter2[myTestTypeIter2A, myTestTypeIter2B]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+	}
+
+	ttArt := reflect.TypeOf((*myTestTypeIter2A)(nil)).Elem()
+	checkRegisterations(t, ttArt)
+	ttBrt := reflect.TypeOf((*myTestTypeIter2B)(nil)).Elem()
+	checkRegisterations(t, ttBrt)
+}
+
+func TestIter1_Struct(t *testing.T) {
+	values := []exec.FullValue{{
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       "one",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       "two",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       "three",
+	}}
+
+	i := iter1[string]{s: &exec.FixedReStream{Buf: values}}
+
+	i.Init()
+	fn := i.Value().(func(value *string) bool)
+
+	var s string
+	if ok := fn(&s); !ok {
+		t.Fatalf("First i.Value()(&s)=false, want true")
+	}
+	if got, want := s, "one"; got != want {
+		t.Fatalf("First iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&s); !ok {
+		t.Fatalf("Second i.Value()(&s)=false, want true")
+	}
+	if got, want := s, "two"; got != want {
+		t.Fatalf("Second iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&s); !ok {
+		t.Fatalf("Third i.Value()(&s)=false, want true")
+	}
+	if got, want := s, "three"; got != want {
+		t.Fatalf("Third iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&s); ok {
+		t.Fatalf("Fourth i.Value()(&s)=true, want false")
+	}
+	if err := i.Reset(); err != nil {
+		t.Fatalf("i.Reset()=%v, want nil", err)
+	}
+}
+
+func TestIter2_Struct(t *testing.T) {
+	values := []exec.FullValue{{
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       1,
+		Elm2:      "one",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       2,
+		Elm2:      "two",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       3,
+		Elm2:      "three",
+	}}
+
+	i := iter2[int, string]{s: &exec.FixedReStream{Buf: values}}
+
+	i.Init()
+	fn := i.Value().(func(key *int, value *string) bool)
+
+	var s string
+	var key int
+	if ok := fn(&key, &s); !ok {
+		t.Fatalf("First i.Value()(&s)=false, want true")
+	}
+	if got, want := key, 1; got != want {
+		t.Fatalf("First iter key = %v, want %v", got, want)
+	}
+	if got, want := s, "one"; got != want {
+		t.Fatalf("First iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&key, &s); !ok {
+		t.Fatalf("Second i.Value()(&s)=false, want true")
+	}
+	if got, want := key, 2; got != want {
+		t.Fatalf("Second iter key = %v, want %v", got, want)
+	}
+	if got, want := s, "two"; got != want {
+		t.Fatalf("Second iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&key, &s); !ok {
+		t.Fatalf("Third i.Value()(&s)=false, want true")
+	}
+	if got, want := key, 3; got != want {
+		t.Fatalf("Third iter key = %v, want %v", got, want)
+	}
+	if got, want := s, "three"; got != want {
+		t.Fatalf("Third iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&key, &s); ok {
+		t.Fatalf("Fourth i.Value()(&s)=true, want false")
+	}
+	if err := i.Reset(); err != nil {
+		t.Fatalf("i.Reset()=%v, want nil", err)
+	}
+}
diff --git a/sdks/go/pkg/beam/register/register_test.go b/sdks/go/pkg/beam/register/register_test.go
index 39962ab3c42..8cab02122e9 100644
--- a/sdks/go/pkg/beam/register/register_test.go
+++ b/sdks/go/pkg/beam/register/register_test.go
@@ -21,13 +21,10 @@ import (
 	"testing"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
 
@@ -324,219 +321,6 @@ func TestCombiner_PartialCombiner2(t *testing.T) {
 	}
 }
 
-func TestEmitter1(t *testing.T) {
-	Emitter1[int]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmitter2(t *testing.T) {
-	Emitter2[int, string]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmitter2_WithTimestamp(t *testing.T) {
-	Emitter2[typex.EventTime, string]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmitter3(t *testing.T) {
-	Emitter3[typex.EventTime, int, string]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmit1(t *testing.T) {
-	e := &emit1[int]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(int))
-	fn(3)
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestEmit2(t *testing.T) {
-	e := &emit2[int, string]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(int, string))
-	fn(3, "hello")
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestEmit1WithTimestamp(t *testing.T) {
-	e := &emit1WithTimestamp[int]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(typex.EventTime, int))
-	fn(mtime.MaxTimestamp, 3)
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestEmit2WithTimestamp(t *testing.T) {
-	e := &emit2WithTimestamp[int, string]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(typex.EventTime, int, string))
-	fn(mtime.MaxTimestamp, 3, "hello")
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestIter1(t *testing.T) {
-	Iter1[int]()
-	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
-		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestIter2(t *testing.T) {
-	Iter2[int, string]()
-	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) {
-		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestIter1_Struct(t *testing.T) {
-	values := []exec.FullValue{exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       "one",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       "two",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       "three",
-	}}
-
-	i := iter1[string]{s: &exec.FixedReStream{Buf: values}}
-
-	i.Init()
-	fn := i.Value().(func(value *string) bool)
-
-	var s string
-	if ok := fn(&s); !ok {
-		t.Fatalf("First i.Value()(&s)=false, want true")
-	}
-	if got, want := s, "one"; got != want {
-		t.Fatalf("First iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&s); !ok {
-		t.Fatalf("Second i.Value()(&s)=false, want true")
-	}
-	if got, want := s, "two"; got != want {
-		t.Fatalf("Second iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&s); !ok {
-		t.Fatalf("Third i.Value()(&s)=false, want true")
-	}
-	if got, want := s, "three"; got != want {
-		t.Fatalf("Third iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&s); ok {
-		t.Fatalf("Fourth i.Value()(&s)=true, want false")
-	}
-	if err := i.Reset(); err != nil {
-		t.Fatalf("i.Reset()=%v, want nil", err)
-	}
-}
-
-func TestIter2_Struct(t *testing.T) {
-	values := []exec.FullValue{exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       1,
-		Elm2:      "one",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       2,
-		Elm2:      "two",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       3,
-		Elm2:      "three",
-	}}
-
-	i := iter2[int, string]{s: &exec.FixedReStream{Buf: values}}
-
-	i.Init()
-	fn := i.Value().(func(key *int, value *string) bool)
-
-	var s string
-	var key int
-	if ok := fn(&key, &s); !ok {
-		t.Fatalf("First i.Value()(&s)=false, want true")
-	}
-	if got, want := key, 1; got != want {
-		t.Fatalf("First iter key = %v, want %v", got, want)
-	}
-	if got, want := s, "one"; got != want {
-		t.Fatalf("First iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&key, &s); !ok {
-		t.Fatalf("Second i.Value()(&s)=false, want true")
-	}
-	if got, want := key, 2; got != want {
-		t.Fatalf("Second iter key = %v, want %v", got, want)
-	}
-	if got, want := s, "two"; got != want {
-		t.Fatalf("Second iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&key, &s); !ok {
-		t.Fatalf("Third i.Value()(&s)=false, want true")
-	}
-	if got, want := key, 3; got != want {
-		t.Fatalf("Third iter key = %v, want %v", got, want)
-	}
-	if got, want := s, "three"; got != want {
-		t.Fatalf("Third iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&key, &s); ok {
-		t.Fatalf("Fourth i.Value()(&s)=true, want false")
-	}
-	if err := i.Reset(); err != nil {
-		t.Fatalf("i.Reset()=%v, want nil", err)
-	}
-}
-
 type CustomFunctionParameter struct {
 	key string
 	val int