You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/08/22 21:57:56 UTC

[beam] branch master updated: Add state integration test (#22815)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 70ebae31996 Add state integration test (#22815)
70ebae31996 is described below

commit 70ebae319967c93fad08bca06b0ead3e830ae95f
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon Aug 22 17:57:48 2022 -0400

    Add state integration test (#22815)
---
 sdks/go/test/integration/integration.go           |  6 ++
 sdks/go/test/integration/primitives/state.go      | 77 +++++++++++++++++++++++
 sdks/go/test/integration/primitives/state_test.go | 28 +++++++++
 3 files changed, 111 insertions(+)

diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index 1c35247ac32..f80c4c91e47 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -88,6 +88,8 @@ var directFilters = []string{
 	"TestFhirIO.*",
 	// OOMs currently only lead to heap dumps on Dataflow runner
 	"TestOomParDo",
+	// The direct runner does not support user state.
+	"TestValueState",
 }
 
 var portableFilters = []string{
@@ -108,6 +110,8 @@ var portableFilters = []string{
 	"TestFhirIO.*",
 	// OOMs currently only lead to heap dumps on Dataflow runner
 	"TestOomParDo",
+	// The portable runner does not support user state.
+	"TestValueState",
 }
 
 var flinkFilters = []string{
@@ -149,6 +153,8 @@ var samzaFilters = []string{
 	"TestFhirIO.*",
 	// OOMs currently only lead to heap dumps on Dataflow runner
 	"TestOomParDo",
+	// The samza runner does not support user state.
+	"TestValueState",
 }
 
 var sparkFilters = []string{
diff --git a/sdks/go/test/integration/primitives/state.go b/sdks/go/test/integration/primitives/state.go
new file mode 100644
index 00000000000..ed23f1784c1
--- /dev/null
+++ b/sdks/go/test/integration/primitives/state.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+	"fmt"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+)
+
+func init() {
+	register.DoFn3x1[state.Provider, string, int, string](&valueStateFn{})
+	register.Emitter2[string, int]()
+}
+
+// TruncateFn is an SDF.
+type valueStateFn struct {
+	State1 state.Value[int]
+	State2 state.Value[string]
+}
+
+func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) string {
+	i, ok, err := f.State1.Read(s)
+	if err != nil {
+		panic(err)
+	}
+	if !ok {
+		i = 1
+	}
+	f.State1.Write(s, i+1)
+	if err != nil {
+		panic(err)
+	}
+
+	j, ok, err := f.State2.Read(s)
+	if err != nil {
+		panic(err)
+	}
+	if !ok {
+		j = "I"
+	}
+	f.State2.Write(s, j+"I")
+	if err != nil {
+		panic(err)
+	}
+	return fmt.Sprintf("%s: %v, %s", w, i, j)
+}
+
+// ValueStateParDo tests a DoFn that uses value state.
+func ValueStateParDo() *beam.Pipeline {
+	p, s := beam.NewPipelineWithRoot()
+
+	in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
+	keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
+		emit(w, 1)
+	}, in)
+	counts := beam.ParDo(s, &valueStateFn{State1: state.MakeValueState[int]("key1"), State2: state.MakeValueState[string]("key2")}, keyed)
+	passert.Equals(s, counts, "apple: 1, I", "pear: 1, I", "peach: 1, I", "apple: 2, II", "apple: 3, III", "pear: 2, II")
+
+	return p
+}
diff --git a/sdks/go/test/integration/primitives/state_test.go b/sdks/go/test/integration/primitives/state_test.go
new file mode 100644
index 00000000000..ece0bc361df
--- /dev/null
+++ b/sdks/go/test/integration/primitives/state_test.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	"github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+func TestValueState(t *testing.T) {
+	integration.CheckFilters(t)
+	ptest.RunAndValidate(t, ValueStateParDo())
+}