You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/28 21:40:13 UTC

[beam] 01/02: Ensure iterated and emitted types are registered.

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch beam23889
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e45e366e33a28bd9d7ebc7b94a1dda18e88192c5
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Fri Oct 28 14:34:11 2022 -0700

    Ensure iterated and emitted types are registered.
---
 .../pkg/beam/core/runtime/graphx/schema/schema.go  |   1 +
 sdks/go/pkg/beam/register/emitter.go               |  23 ++-
 sdks/go/pkg/beam/register/emitter_test.go          | 154 +++++++++++++++
 sdks/go/pkg/beam/register/iter.go                  |  19 +-
 sdks/go/pkg/beam/register/iter_test.go             | 192 ++++++++++++++++++
 sdks/go/pkg/beam/register/register_test.go         | 216 ---------------------
 6 files changed, 380 insertions(+), 225 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 7ca516b5694..b8c69007fbd 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -92,6 +92,7 @@ func getUUID(ut reflect.Type) string {
 // Registered returns whether the given type has been registered with
 // the schema package.
 func (r *Registry) Registered(ut reflect.Type) bool {
+	r.reconcileRegistrations()
 	_, ok := r.syntheticToUser[ut]
 	return ok
 }
diff --git a/sdks/go/pkg/beam/register/emitter.go b/sdks/go/pkg/beam/register/emitter.go
index 6c88d28d9fc..3b9cb9910d2 100644
--- a/sdks/go/pkg/beam/register/emitter.go
+++ b/sdks/go/pkg/beam/register/emitter.go
@@ -129,7 +129,9 @@ func Emitter1[T1 any]() {
 	registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
 		return &emit1[T1]{n: n}
 	}
-	exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+	eT := reflect.TypeOf(e).Elem()
+	registerType(eT.In(0))
+	exec.RegisterEmitter(eT, registerFunc)
 }
 
 // Emitter2 registers parameters from your DoFn with a
@@ -147,18 +149,25 @@ func Emitter2[T1, T2 any]() {
 			return &emit1WithTimestamp[T2]{n: n}
 		}
 	}
-	exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+	eT := reflect.TypeOf(e).Elem()
+	registerType(eT.In(0))
+	registerType(eT.In(1))
+	exec.RegisterEmitter(eT, registerFunc)
 }
 
 // Emitter3 registers parameters from your DoFn with a
-// signature func(T1, T2, T3) and optimizes their execution.
+// signature func(beam.EventTime, T2, T3) and optimizes their execution.
 // This must be done by passing in type parameters of all inputs as constraints,
 // aka: register.Emitter3[beam.EventTime, T1, T2](), where T1 is the type of
 // your key and T2 is the type of your value.
-func Emitter3[T1 typex.EventTime, T2, T3 any]() {
-	e := (*func(T1, T2, T3))(nil)
+func Emitter3[ET typex.EventTime, T1, T2 any]() {
+	e := (*func(ET, T1, T2))(nil)
 	registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
-		return &emit2WithTimestamp[T2, T3]{n: n}
+		return &emit2WithTimestamp[T1, T2]{n: n}
 	}
-	exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+	eT := reflect.TypeOf(e).Elem()
+	// No need to register event time.
+	registerType(eT.In(1))
+	registerType(eT.In(2))
+	exec.RegisterEmitter(eT, registerFunc)
 }
diff --git a/sdks/go/pkg/beam/register/emitter_test.go b/sdks/go/pkg/beam/register/emitter_test.go
new file mode 100644
index 00000000000..1855f830c3f
--- /dev/null
+++ b/sdks/go/pkg/beam/register/emitter_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package register
+
+import (
+	"context"
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type myTestTypeEmitter1 struct {
+	Int int
+}
+
+func TestEmitter1(t *testing.T) {
+	Emitter1[int]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) = false, want true")
+	}
+
+	Emitter1[myTestTypeEmitter1]()
+	rt := reflect.TypeOf((*myTestTypeEmitter1)(nil)).Elem()
+	checkRegisterations(t, rt)
+}
+
+type myTestTypeEmitter2A struct {
+	Int int
+}
+
+type myTestTypeEmitter2B struct {
+	String string
+}
+
+func TestEmitter2(t *testing.T) {
+	Emitter2[int, string]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) = false, want true")
+	}
+
+	Emitter2[myTestTypeEmitter2A, myTestTypeEmitter2B]()
+	tA := reflect.TypeOf((*myTestTypeEmitter2A)(nil)).Elem()
+	checkRegisterations(t, tA)
+	tB := reflect.TypeOf((*myTestTypeEmitter2B)(nil)).Elem()
+	checkRegisterations(t, tB)
+}
+
+func TestEmitter2_WithTimestamp(t *testing.T) {
+	Emitter2[typex.EventTime, string]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) = false, want true")
+	}
+}
+
+type myTestTypeEmitter3A struct {
+	Int int
+}
+
+type myTestTypeEmitter3B struct {
+	String string
+}
+
+func TestEmitter3(t *testing.T) {
+	Emitter3[typex.EventTime, int, string]()
+	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) {
+		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) = false, want true")
+	}
+
+	Emitter3[typex.EventTime, myTestTypeEmitter3A, myTestTypeEmitter3B]()
+	tA := reflect.TypeOf((*myTestTypeEmitter3A)(nil)).Elem()
+	checkRegisterations(t, tA)
+	tB := reflect.TypeOf((*myTestTypeEmitter3B)(nil)).Elem()
+	checkRegisterations(t, tB)
+}
+
+func TestEmit1(t *testing.T) {
+	e := &emit1[int]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(int))
+	fn(3)
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
+
+func TestEmit2(t *testing.T) {
+	e := &emit2[int, string]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(int, string))
+	fn(3, "hello")
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
+
+func TestEmit1WithTimestamp(t *testing.T) {
+	e := &emit1WithTimestamp[int]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(typex.EventTime, int))
+	fn(mtime.MaxTimestamp, 3)
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
+
+func TestEmit2WithTimestamp(t *testing.T) {
+	e := &emit2WithTimestamp[int, string]{n: &elementProcessor{}}
+	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+	fn := e.Value().(func(typex.EventTime, int, string))
+	fn(mtime.MaxTimestamp, 3, "hello")
+	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
+	}
+	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
+		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+	}
+}
diff --git a/sdks/go/pkg/beam/register/iter.go b/sdks/go/pkg/beam/register/iter.go
index 71d3f3df723..16bce66c143 100644
--- a/sdks/go/pkg/beam/register/iter.go
+++ b/sdks/go/pkg/beam/register/iter.go
@@ -20,7 +20,9 @@ import (
 	"io"
 	"reflect"
 
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
 )
 
 type iter1[T any] struct {
@@ -104,6 +106,14 @@ func (v *iter2[T1, T2]) invoke(key *T1, value *T2) bool {
 	return true
 }
 
+func registerType(t reflect.Type) {
+	if _, ok := runtime.TypeKey(t); !ok {
+		return
+	}
+	runtime.RegisterType(t)
+	schema.RegisterType(t)
+}
+
 // Iter1 registers parameters from your DoFn with a
 // signature func(*T) bool and optimizes their execution.
 // This must be done by passing in type parameters of all inputs as constraints,
@@ -113,7 +123,9 @@ func Iter1[T any]() {
 	registerFunc := func(s exec.ReStream) exec.ReusableInput {
 		return &iter1[T]{s: s}
 	}
-	exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc)
+	itT := reflect.TypeOf(i).Elem()
+	registerType(itT.In(0).Elem())
+	exec.RegisterInput(itT, registerFunc)
 }
 
 // Iter1 registers parameters from your DoFn with a
@@ -125,5 +137,8 @@ func Iter2[T1, T2 any]() {
 	registerFunc := func(s exec.ReStream) exec.ReusableInput {
 		return &iter2[T1, T2]{s: s}
 	}
-	exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc)
+	itT := reflect.TypeOf(i).Elem()
+	registerType(itT.In(0).Elem())
+	registerType(itT.In(1).Elem())
+	exec.RegisterInput(itT, registerFunc)
 }
diff --git a/sdks/go/pkg/beam/register/iter_test.go b/sdks/go/pkg/beam/register/iter_test.go
new file mode 100644
index 00000000000..7cbbf388d7e
--- /dev/null
+++ b/sdks/go/pkg/beam/register/iter_test.go
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package register
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+)
+
+type myTestTypeIter1 struct {
+	Int int
+}
+
+func checkRegisterations(t *testing.T, rt reflect.Type) {
+	t.Helper()
+	key, ok := runtime.TypeKey(rt)
+	if !ok {
+		t.Fatalf("runtime.TypeKey(%v): no typekey for type", rt)
+	}
+	if _, ok := runtime.LookupType(key); !ok {
+		t.Errorf("want type %v to be available with key %q", rt, key)
+	}
+	if !schema.Registered(rt) {
+		t.Errorf("want type %v to be registered with schemas", rt)
+	}
+}
+
+func TestIter1(t *testing.T) {
+	Iter1[int]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+	}
+
+	Iter1[myTestTypeIter1]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+	}
+
+	ttrt := reflect.TypeOf((*myTestTypeIter1)(nil)).Elem()
+	checkRegisterations(t, ttrt)
+}
+
+type myTestTypeIter2A struct {
+	Int int
+}
+
+type myTestTypeIter2B struct {
+	Int int
+}
+
+func TestIter2(t *testing.T) {
+	Iter2[int, string]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) = false, want true")
+	}
+
+	Iter2[myTestTypeIter2A, myTestTypeIter2B]()
+	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+	}
+
+	ttArt := reflect.TypeOf((*myTestTypeIter2A)(nil)).Elem()
+	checkRegisterations(t, ttArt)
+	ttBrt := reflect.TypeOf((*myTestTypeIter2B)(nil)).Elem()
+	checkRegisterations(t, ttBrt)
+}
+
+func TestIter1_Struct(t *testing.T) {
+	values := []exec.FullValue{{
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       "one",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       "two",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       "three",
+	}}
+
+	i := iter1[string]{s: &exec.FixedReStream{Buf: values}}
+
+	i.Init()
+	fn := i.Value().(func(value *string) bool)
+
+	var s string
+	if ok := fn(&s); !ok {
+		t.Fatalf("First i.Value()(&s)=false, want true")
+	}
+	if got, want := s, "one"; got != want {
+		t.Fatalf("First iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&s); !ok {
+		t.Fatalf("Second i.Value()(&s)=false, want true")
+	}
+	if got, want := s, "two"; got != want {
+		t.Fatalf("Second iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&s); !ok {
+		t.Fatalf("Third i.Value()(&s)=false, want true")
+	}
+	if got, want := s, "three"; got != want {
+		t.Fatalf("Third iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&s); ok {
+		t.Fatalf("Fourth i.Value()(&s)=true, want false")
+	}
+	if err := i.Reset(); err != nil {
+		t.Fatalf("i.Reset()=%v, want nil", err)
+	}
+}
+
+func TestIter2_Struct(t *testing.T) {
+	values := []exec.FullValue{{
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       1,
+		Elm2:      "one",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       2,
+		Elm2:      "two",
+	}, {
+		Windows:   window.SingleGlobalWindow,
+		Timestamp: mtime.ZeroTimestamp,
+		Elm:       3,
+		Elm2:      "three",
+	}}
+
+	i := iter2[int, string]{s: &exec.FixedReStream{Buf: values}}
+
+	i.Init()
+	fn := i.Value().(func(key *int, value *string) bool)
+
+	var s string
+	var key int
+	if ok := fn(&key, &s); !ok {
+		t.Fatalf("First i.Value()(&s)=false, want true")
+	}
+	if got, want := key, 1; got != want {
+		t.Fatalf("First iter key = %v, want %v", got, want)
+	}
+	if got, want := s, "one"; got != want {
+		t.Fatalf("First iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&key, &s); !ok {
+		t.Fatalf("Second i.Value()(&s)=false, want true")
+	}
+	if got, want := key, 2; got != want {
+		t.Fatalf("Second iter key = %v, want %v", got, want)
+	}
+	if got, want := s, "two"; got != want {
+		t.Fatalf("Second iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&key, &s); !ok {
+		t.Fatalf("Third i.Value()(&s)=false, want true")
+	}
+	if got, want := key, 3; got != want {
+		t.Fatalf("Third iter key = %v, want %v", got, want)
+	}
+	if got, want := s, "three"; got != want {
+		t.Fatalf("Third iter value = %v, want %v", got, want)
+	}
+	if ok := fn(&key, &s); ok {
+		t.Fatalf("Fourth i.Value()(&s)=true, want false")
+	}
+	if err := i.Reset(); err != nil {
+		t.Fatalf("i.Reset()=%v, want nil", err)
+	}
+}
diff --git a/sdks/go/pkg/beam/register/register_test.go b/sdks/go/pkg/beam/register/register_test.go
index 39962ab3c42..8cab02122e9 100644
--- a/sdks/go/pkg/beam/register/register_test.go
+++ b/sdks/go/pkg/beam/register/register_test.go
@@ -21,13 +21,10 @@ import (
 	"testing"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
 
@@ -324,219 +321,6 @@ func TestCombiner_PartialCombiner2(t *testing.T) {
 	}
 }
 
-func TestEmitter1(t *testing.T) {
-	Emitter1[int]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmitter2(t *testing.T) {
-	Emitter2[int, string]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmitter2_WithTimestamp(t *testing.T) {
-	Emitter2[typex.EventTime, string]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmitter3(t *testing.T) {
-	Emitter3[typex.EventTime, int, string]()
-	if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) {
-		t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestEmit1(t *testing.T) {
-	e := &emit1[int]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(int))
-	fn(3)
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestEmit2(t *testing.T) {
-	e := &emit2[int, string]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(int, string))
-	fn(3, "hello")
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestEmit1WithTimestamp(t *testing.T) {
-	e := &emit1WithTimestamp[int]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(typex.EventTime, int))
-	fn(mtime.MaxTimestamp, 3)
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestEmit2WithTimestamp(t *testing.T) {
-	e := &emit2WithTimestamp[int, string]{n: &elementProcessor{}}
-	e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
-	fn := e.Value().(func(typex.EventTime, int, string))
-	fn(mtime.MaxTimestamp, 3, "hello")
-	if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
-	}
-	if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
-		t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
-	}
-}
-
-func TestIter1(t *testing.T) {
-	Iter1[int]()
-	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
-		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestIter2(t *testing.T) {
-	Iter2[int, string]()
-	if !exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) {
-		t.Fatalf("exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) = false, want true")
-	}
-}
-
-func TestIter1_Struct(t *testing.T) {
-	values := []exec.FullValue{exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       "one",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       "two",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       "three",
-	}}
-
-	i := iter1[string]{s: &exec.FixedReStream{Buf: values}}
-
-	i.Init()
-	fn := i.Value().(func(value *string) bool)
-
-	var s string
-	if ok := fn(&s); !ok {
-		t.Fatalf("First i.Value()(&s)=false, want true")
-	}
-	if got, want := s, "one"; got != want {
-		t.Fatalf("First iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&s); !ok {
-		t.Fatalf("Second i.Value()(&s)=false, want true")
-	}
-	if got, want := s, "two"; got != want {
-		t.Fatalf("Second iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&s); !ok {
-		t.Fatalf("Third i.Value()(&s)=false, want true")
-	}
-	if got, want := s, "three"; got != want {
-		t.Fatalf("Third iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&s); ok {
-		t.Fatalf("Fourth i.Value()(&s)=true, want false")
-	}
-	if err := i.Reset(); err != nil {
-		t.Fatalf("i.Reset()=%v, want nil", err)
-	}
-}
-
-func TestIter2_Struct(t *testing.T) {
-	values := []exec.FullValue{exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       1,
-		Elm2:      "one",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       2,
-		Elm2:      "two",
-	}, exec.FullValue{
-		Windows:   window.SingleGlobalWindow,
-		Timestamp: mtime.ZeroTimestamp,
-		Elm:       3,
-		Elm2:      "three",
-	}}
-
-	i := iter2[int, string]{s: &exec.FixedReStream{Buf: values}}
-
-	i.Init()
-	fn := i.Value().(func(key *int, value *string) bool)
-
-	var s string
-	var key int
-	if ok := fn(&key, &s); !ok {
-		t.Fatalf("First i.Value()(&s)=false, want true")
-	}
-	if got, want := key, 1; got != want {
-		t.Fatalf("First iter key = %v, want %v", got, want)
-	}
-	if got, want := s, "one"; got != want {
-		t.Fatalf("First iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&key, &s); !ok {
-		t.Fatalf("Second i.Value()(&s)=false, want true")
-	}
-	if got, want := key, 2; got != want {
-		t.Fatalf("Second iter key = %v, want %v", got, want)
-	}
-	if got, want := s, "two"; got != want {
-		t.Fatalf("Second iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&key, &s); !ok {
-		t.Fatalf("Third i.Value()(&s)=false, want true")
-	}
-	if got, want := key, 3; got != want {
-		t.Fatalf("Third iter key = %v, want %v", got, want)
-	}
-	if got, want := s, "three"; got != want {
-		t.Fatalf("Third iter value = %v, want %v", got, want)
-	}
-	if ok := fn(&key, &s); ok {
-		t.Fatalf("Fourth i.Value()(&s)=true, want false")
-	}
-	if err := i.Reset(); err != nil {
-		t.Fatalf("i.Reset()=%v, want nil", err)
-	}
-}
-
 type CustomFunctionParameter struct {
 	key string
 	val int