You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/28 21:40:13 UTC
[beam] 01/02: Ensure iterated and emitted types are registered.
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch beam23889
in repository https://gitbox.apache.org/repos/asf/beam.git
commit e45e366e33a28bd9d7ebc7b94a1dda18e88192c5
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Fri Oct 28 14:34:11 2022 -0700
Ensure iterated and emitted types are registered.
---
.../pkg/beam/core/runtime/graphx/schema/schema.go | 1 +
sdks/go/pkg/beam/register/emitter.go | 23 ++-
sdks/go/pkg/beam/register/emitter_test.go | 154 +++++++++++++++
sdks/go/pkg/beam/register/iter.go | 19 +-
sdks/go/pkg/beam/register/iter_test.go | 192 ++++++++++++++++++
sdks/go/pkg/beam/register/register_test.go | 216 ---------------------
6 files changed, 380 insertions(+), 225 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 7ca516b5694..b8c69007fbd 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -92,6 +92,7 @@ func getUUID(ut reflect.Type) string {
// Registered returns whether the given type has been registered with
// the schema package.
func (r *Registry) Registered(ut reflect.Type) bool {
+ r.reconcileRegistrations()
_, ok := r.syntheticToUser[ut]
return ok
}
diff --git a/sdks/go/pkg/beam/register/emitter.go b/sdks/go/pkg/beam/register/emitter.go
index 6c88d28d9fc..3b9cb9910d2 100644
--- a/sdks/go/pkg/beam/register/emitter.go
+++ b/sdks/go/pkg/beam/register/emitter.go
@@ -129,7 +129,9 @@ func Emitter1[T1 any]() {
registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
return &emit1[T1]{n: n}
}
- exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+ eT := reflect.TypeOf(e).Elem()
+ registerType(eT.In(0))
+ exec.RegisterEmitter(eT, registerFunc)
}
// Emitter2 registers parameters from your DoFn with a
@@ -147,18 +149,25 @@ func Emitter2[T1, T2 any]() {
return &emit1WithTimestamp[T2]{n: n}
}
}
- exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+ eT := reflect.TypeOf(e).Elem()
+ registerType(eT.In(0))
+ registerType(eT.In(1))
+ exec.RegisterEmitter(eT, registerFunc)
}
// Emitter3 registers parameters from your DoFn with a
-// signature func(T1, T2, T3) and optimizes their execution.
+// signature func(beam.EventTime, T2, T3) and optimizes their execution.
// This must be done by passing in type parameters of all inputs as constraints,
// aka: register.Emitter3[beam.EventTime, T1, T2](), where T1 is the type of
// your key and T2 is the type of your value.
-func Emitter3[T1 typex.EventTime, T2, T3 any]() {
- e := (*func(T1, T2, T3))(nil)
+func Emitter3[ET typex.EventTime, T1, T2 any]() {
+ e := (*func(ET, T1, T2))(nil)
registerFunc := func(n exec.ElementProcessor) exec.ReusableEmitter {
- return &emit2WithTimestamp[T2, T3]{n: n}
+ return &emit2WithTimestamp[T1, T2]{n: n}
}
- exec.RegisterEmitter(reflect.TypeOf(e).Elem(), registerFunc)
+ eT := reflect.TypeOf(e).Elem()
+ // No need to register event time.
+ registerType(eT.In(1))
+ registerType(eT.In(2))
+ exec.RegisterEmitter(eT, registerFunc)
}
diff --git a/sdks/go/pkg/beam/register/emitter_test.go b/sdks/go/pkg/beam/register/emitter_test.go
new file mode 100644
index 00000000000..1855f830c3f
--- /dev/null
+++ b/sdks/go/pkg/beam/register/emitter_test.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package register
+
+import (
+ "context"
+ "reflect"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type myTestTypeEmitter1 struct {
+ Int int
+}
+
+func TestEmitter1(t *testing.T) {
+ Emitter1[int]()
+ if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) {
+ t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) = false, want true")
+ }
+
+ Emitter1[myTestTypeEmitter1]()
+ rt := reflect.TypeOf((*myTestTypeEmitter1)(nil)).Elem()
+ checkRegisterations(t, rt)
+}
+
+type myTestTypeEmitter2A struct {
+ Int int
+}
+
+type myTestTypeEmitter2B struct {
+ String string
+}
+
+func TestEmitter2(t *testing.T) {
+ Emitter2[int, string]()
+ if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) {
+ t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) = false, want true")
+ }
+
+ Emitter2[myTestTypeEmitter2A, myTestTypeEmitter2B]()
+ tA := reflect.TypeOf((*myTestTypeEmitter2A)(nil)).Elem()
+ checkRegisterations(t, tA)
+ tB := reflect.TypeOf((*myTestTypeEmitter2B)(nil)).Elem()
+ checkRegisterations(t, tB)
+}
+
+func TestEmitter2_WithTimestamp(t *testing.T) {
+ Emitter2[typex.EventTime, string]()
+ if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) {
+ t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) = false, want true")
+ }
+}
+
+type myTestTypeEmitter3A struct {
+ Int int
+}
+
+type myTestTypeEmitter3B struct {
+ String string
+}
+
+func TestEmitter3(t *testing.T) {
+ Emitter3[typex.EventTime, int, string]()
+ if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) {
+ t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) = false, want true")
+ }
+
+ Emitter3[typex.EventTime, myTestTypeEmitter3A, myTestTypeEmitter3B]()
+ tA := reflect.TypeOf((*myTestTypeEmitter3A)(nil)).Elem()
+ checkRegisterations(t, tA)
+ tB := reflect.TypeOf((*myTestTypeEmitter3B)(nil)).Elem()
+ checkRegisterations(t, tB)
+}
+
+func TestEmit1(t *testing.T) {
+ e := &emit1[int]{n: &elementProcessor{}}
+ e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+ fn := e.Value().(func(int))
+ fn(3)
+ if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+ }
+ if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
+ }
+ if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+ }
+}
+
+func TestEmit2(t *testing.T) {
+ e := &emit2[int, string]{n: &elementProcessor{}}
+ e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+ fn := e.Value().(func(int, string))
+ fn(3, "hello")
+ if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+ }
+ if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
+ }
+ if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+ }
+}
+
+func TestEmit1WithTimestamp(t *testing.T) {
+ e := &emit1WithTimestamp[int]{n: &elementProcessor{}}
+ e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+ fn := e.Value().(func(typex.EventTime, int))
+ fn(mtime.MaxTimestamp, 3)
+ if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+ }
+ if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
+ }
+ if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+ }
+}
+
+func TestEmit2WithTimestamp(t *testing.T) {
+ e := &emit2WithTimestamp[int, string]{n: &elementProcessor{}}
+ e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
+ fn := e.Value().(func(typex.EventTime, int, string))
+ fn(mtime.MaxTimestamp, 3, "hello")
+ if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
+ }
+ if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
+ }
+ if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
+ t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
+ }
+}
diff --git a/sdks/go/pkg/beam/register/iter.go b/sdks/go/pkg/beam/register/iter.go
index 71d3f3df723..16bce66c143 100644
--- a/sdks/go/pkg/beam/register/iter.go
+++ b/sdks/go/pkg/beam/register/iter.go
@@ -20,7 +20,9 @@ import (
"io"
"reflect"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
)
type iter1[T any] struct {
@@ -104,6 +106,14 @@ func (v *iter2[T1, T2]) invoke(key *T1, value *T2) bool {
return true
}
+func registerType(t reflect.Type) {
+ if _, ok := runtime.TypeKey(t); !ok {
+ return
+ }
+ runtime.RegisterType(t)
+ schema.RegisterType(t)
+}
+
// Iter1 registers parameters from your DoFn with a
// signature func(*T) bool and optimizes their execution.
// This must be done by passing in type parameters of all inputs as constraints,
@@ -113,7 +123,9 @@ func Iter1[T any]() {
registerFunc := func(s exec.ReStream) exec.ReusableInput {
return &iter1[T]{s: s}
}
- exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc)
+ itT := reflect.TypeOf(i).Elem()
+ registerType(itT.In(0).Elem())
+ exec.RegisterInput(itT, registerFunc)
}
// Iter1 registers parameters from your DoFn with a
@@ -125,5 +137,8 @@ func Iter2[T1, T2 any]() {
registerFunc := func(s exec.ReStream) exec.ReusableInput {
return &iter2[T1, T2]{s: s}
}
- exec.RegisterInput(reflect.TypeOf(i).Elem(), registerFunc)
+ itT := reflect.TypeOf(i).Elem()
+ registerType(itT.In(0).Elem())
+ registerType(itT.In(1).Elem())
+ exec.RegisterInput(itT, registerFunc)
}
diff --git a/sdks/go/pkg/beam/register/iter_test.go b/sdks/go/pkg/beam/register/iter_test.go
new file mode 100644
index 00000000000..7cbbf388d7e
--- /dev/null
+++ b/sdks/go/pkg/beam/register/iter_test.go
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package register
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+)
+
+type myTestTypeIter1 struct {
+ Int int
+}
+
+func checkRegisterations(t *testing.T, rt reflect.Type) {
+ t.Helper()
+ key, ok := runtime.TypeKey(rt)
+ if !ok {
+ t.Fatalf("runtime.TypeKey(%v): no typekey for type", rt)
+ }
+ if _, ok := runtime.LookupType(key); !ok {
+ t.Errorf("want type %v to be available with key %q", rt, key)
+ }
+ if !schema.Registered(rt) {
+ t.Errorf("want type %v to be registered with schemas", rt)
+ }
+}
+
+func TestIter1(t *testing.T) {
+ Iter1[int]()
+ if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+ t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+ }
+
+ Iter1[myTestTypeIter1]()
+ if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+ t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+ }
+
+ ttrt := reflect.TypeOf((*myTestTypeIter1)(nil)).Elem()
+ checkRegisterations(t, ttrt)
+}
+
+type myTestTypeIter2A struct {
+ Int int
+}
+
+type myTestTypeIter2B struct {
+ Int int
+}
+
+func TestIter2(t *testing.T) {
+ Iter2[int, string]()
+ if !exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) {
+ t.Fatalf("exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) = false, want true")
+ }
+
+ Iter2[myTestTypeIter2A, myTestTypeIter2B]()
+ if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
+ t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
+ }
+
+ ttArt := reflect.TypeOf((*myTestTypeIter2A)(nil)).Elem()
+ checkRegisterations(t, ttArt)
+ ttBrt := reflect.TypeOf((*myTestTypeIter2B)(nil)).Elem()
+ checkRegisterations(t, ttBrt)
+}
+
+func TestIter1_Struct(t *testing.T) {
+ values := []exec.FullValue{{
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: "one",
+ }, {
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: "two",
+ }, {
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: "three",
+ }}
+
+ i := iter1[string]{s: &exec.FixedReStream{Buf: values}}
+
+ i.Init()
+ fn := i.Value().(func(value *string) bool)
+
+ var s string
+ if ok := fn(&s); !ok {
+ t.Fatalf("First i.Value()(&s)=false, want true")
+ }
+ if got, want := s, "one"; got != want {
+ t.Fatalf("First iter value = %v, want %v", got, want)
+ }
+ if ok := fn(&s); !ok {
+ t.Fatalf("Second i.Value()(&s)=false, want true")
+ }
+ if got, want := s, "two"; got != want {
+ t.Fatalf("Second iter value = %v, want %v", got, want)
+ }
+ if ok := fn(&s); !ok {
+ t.Fatalf("Third i.Value()(&s)=false, want true")
+ }
+ if got, want := s, "three"; got != want {
+ t.Fatalf("Third iter value = %v, want %v", got, want)
+ }
+ if ok := fn(&s); ok {
+ t.Fatalf("Fourth i.Value()(&s)=true, want false")
+ }
+ if err := i.Reset(); err != nil {
+ t.Fatalf("i.Reset()=%v, want nil", err)
+ }
+}
+
+func TestIter2_Struct(t *testing.T) {
+ values := []exec.FullValue{{
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: 1,
+ Elm2: "one",
+ }, {
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: 2,
+ Elm2: "two",
+ }, {
+ Windows: window.SingleGlobalWindow,
+ Timestamp: mtime.ZeroTimestamp,
+ Elm: 3,
+ Elm2: "three",
+ }}
+
+ i := iter2[int, string]{s: &exec.FixedReStream{Buf: values}}
+
+ i.Init()
+ fn := i.Value().(func(key *int, value *string) bool)
+
+ var s string
+ var key int
+ if ok := fn(&key, &s); !ok {
+ t.Fatalf("First i.Value()(&s)=false, want true")
+ }
+ if got, want := key, 1; got != want {
+ t.Fatalf("First iter key = %v, want %v", got, want)
+ }
+ if got, want := s, "one"; got != want {
+ t.Fatalf("First iter value = %v, want %v", got, want)
+ }
+ if ok := fn(&key, &s); !ok {
+ t.Fatalf("Second i.Value()(&s)=false, want true")
+ }
+ if got, want := key, 2; got != want {
+ t.Fatalf("Second iter key = %v, want %v", got, want)
+ }
+ if got, want := s, "two"; got != want {
+ t.Fatalf("Second iter value = %v, want %v", got, want)
+ }
+ if ok := fn(&key, &s); !ok {
+ t.Fatalf("Third i.Value()(&s)=false, want true")
+ }
+ if got, want := key, 3; got != want {
+ t.Fatalf("Third iter key = %v, want %v", got, want)
+ }
+ if got, want := s, "three"; got != want {
+ t.Fatalf("Third iter value = %v, want %v", got, want)
+ }
+ if ok := fn(&key, &s); ok {
+ t.Fatalf("Fourth i.Value()(&s)=true, want false")
+ }
+ if err := i.Reset(); err != nil {
+ t.Fatalf("i.Reset()=%v, want nil", err)
+ }
+}
diff --git a/sdks/go/pkg/beam/register/register_test.go b/sdks/go/pkg/beam/register/register_test.go
index 39962ab3c42..8cab02122e9 100644
--- a/sdks/go/pkg/beam/register/register_test.go
+++ b/sdks/go/pkg/beam/register/register_test.go
@@ -21,13 +21,10 @@ import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
@@ -324,219 +321,6 @@ func TestCombiner_PartialCombiner2(t *testing.T) {
}
}
-func TestEmitter1(t *testing.T) {
- Emitter1[int]()
- if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) {
- t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int))(nil)).Elem()) = false, want true")
- }
-}
-
-func TestEmitter2(t *testing.T) {
- Emitter2[int, string]()
- if !exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) {
- t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(int, string))(nil)).Elem()) = false, want true")
- }
-}
-
-func TestEmitter2_WithTimestamp(t *testing.T) {
- Emitter2[typex.EventTime, string]()
- if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) {
- t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, string))(nil)).Elem()) = false, want true")
- }
-}
-
-func TestEmitter3(t *testing.T) {
- Emitter3[typex.EventTime, int, string]()
- if !exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) {
- t.Fatalf("exec.IsEmitterRegistered(reflect.TypeOf((*func(typex.EventTime, int, string))(nil)).Elem()) = false, want true")
- }
-}
-
-func TestEmit1(t *testing.T) {
- e := &emit1[int]{n: &elementProcessor{}}
- e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
- fn := e.Value().(func(int))
- fn(3)
- if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
- }
- if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
- }
- if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
- }
-}
-
-func TestEmit2(t *testing.T) {
- e := &emit2[int, string]{n: &elementProcessor{}}
- e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
- fn := e.Value().(func(int, string))
- fn(3, "hello")
- if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
- }
- if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
- }
- if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.ZeroTimestamp; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
- }
-}
-
-func TestEmit1WithTimestamp(t *testing.T) {
- e := &emit1WithTimestamp[int]{n: &elementProcessor{}}
- e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
- fn := e.Value().(func(typex.EventTime, int))
- fn(mtime.MaxTimestamp, 3)
- if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
- }
- if got := e.n.(*elementProcessor).inFV.Elm2; got != nil {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want nil", got)
- }
- if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
- }
-}
-
-func TestEmit2WithTimestamp(t *testing.T) {
- e := &emit2WithTimestamp[int, string]{n: &elementProcessor{}}
- e.Init(context.Background(), []typex.Window{}, mtime.ZeroTimestamp)
- fn := e.Value().(func(typex.EventTime, int, string))
- fn(mtime.MaxTimestamp, 3, "hello")
- if got, want := e.n.(*elementProcessor).inFV.Elm, 3; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm=%v, want %v", got, want)
- }
- if got, want := e.n.(*elementProcessor).inFV.Elm2, "hello"; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Elm2=%v, want %v", got, want)
- }
- if got, want := e.n.(*elementProcessor).inFV.Timestamp, mtime.MaxTimestamp; got != want {
- t.Errorf("e.Value.(func(int))(3).n.inFV.Timestamp=%v, want %v", got, want)
- }
-}
-
-func TestIter1(t *testing.T) {
- Iter1[int]()
- if !exec.IsInputRegistered(reflect.TypeOf((*func(*int) bool)(nil)).Elem()) {
- t.Fatalf("exec.IsInputRegistered(reflect.TypeOf(((*func(*int) bool)(nil)).Elem()) = false, want true")
- }
-}
-
-func TestIter2(t *testing.T) {
- Iter2[int, string]()
- if !exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) {
- t.Fatalf("exec.IsInputRegistered(reflect.TypeOf((*func(*int, *string) bool)(nil)).Elem()) = false, want true")
- }
-}
-
-func TestIter1_Struct(t *testing.T) {
- values := []exec.FullValue{exec.FullValue{
- Windows: window.SingleGlobalWindow,
- Timestamp: mtime.ZeroTimestamp,
- Elm: "one",
- }, exec.FullValue{
- Windows: window.SingleGlobalWindow,
- Timestamp: mtime.ZeroTimestamp,
- Elm: "two",
- }, exec.FullValue{
- Windows: window.SingleGlobalWindow,
- Timestamp: mtime.ZeroTimestamp,
- Elm: "three",
- }}
-
- i := iter1[string]{s: &exec.FixedReStream{Buf: values}}
-
- i.Init()
- fn := i.Value().(func(value *string) bool)
-
- var s string
- if ok := fn(&s); !ok {
- t.Fatalf("First i.Value()(&s)=false, want true")
- }
- if got, want := s, "one"; got != want {
- t.Fatalf("First iter value = %v, want %v", got, want)
- }
- if ok := fn(&s); !ok {
- t.Fatalf("Second i.Value()(&s)=false, want true")
- }
- if got, want := s, "two"; got != want {
- t.Fatalf("Second iter value = %v, want %v", got, want)
- }
- if ok := fn(&s); !ok {
- t.Fatalf("Third i.Value()(&s)=false, want true")
- }
- if got, want := s, "three"; got != want {
- t.Fatalf("Third iter value = %v, want %v", got, want)
- }
- if ok := fn(&s); ok {
- t.Fatalf("Fourth i.Value()(&s)=true, want false")
- }
- if err := i.Reset(); err != nil {
- t.Fatalf("i.Reset()=%v, want nil", err)
- }
-}
-
-func TestIter2_Struct(t *testing.T) {
- values := []exec.FullValue{exec.FullValue{
- Windows: window.SingleGlobalWindow,
- Timestamp: mtime.ZeroTimestamp,
- Elm: 1,
- Elm2: "one",
- }, exec.FullValue{
- Windows: window.SingleGlobalWindow,
- Timestamp: mtime.ZeroTimestamp,
- Elm: 2,
- Elm2: "two",
- }, exec.FullValue{
- Windows: window.SingleGlobalWindow,
- Timestamp: mtime.ZeroTimestamp,
- Elm: 3,
- Elm2: "three",
- }}
-
- i := iter2[int, string]{s: &exec.FixedReStream{Buf: values}}
-
- i.Init()
- fn := i.Value().(func(key *int, value *string) bool)
-
- var s string
- var key int
- if ok := fn(&key, &s); !ok {
- t.Fatalf("First i.Value()(&s)=false, want true")
- }
- if got, want := key, 1; got != want {
- t.Fatalf("First iter key = %v, want %v", got, want)
- }
- if got, want := s, "one"; got != want {
- t.Fatalf("First iter value = %v, want %v", got, want)
- }
- if ok := fn(&key, &s); !ok {
- t.Fatalf("Second i.Value()(&s)=false, want true")
- }
- if got, want := key, 2; got != want {
- t.Fatalf("Second iter key = %v, want %v", got, want)
- }
- if got, want := s, "two"; got != want {
- t.Fatalf("Second iter value = %v, want %v", got, want)
- }
- if ok := fn(&key, &s); !ok {
- t.Fatalf("Third i.Value()(&s)=false, want true")
- }
- if got, want := key, 3; got != want {
- t.Fatalf("Third iter key = %v, want %v", got, want)
- }
- if got, want := s, "three"; got != want {
- t.Fatalf("Third iter value = %v, want %v", got, want)
- }
- if ok := fn(&key, &s); ok {
- t.Fatalf("Fourth i.Value()(&s)=true, want false")
- }
- if err := i.Reset(); err != nil {
- t.Fatalf("i.Reset()=%v, want nil", err)
- }
-}
-
type CustomFunctionParameter struct {
key string
val int