You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/04/03 16:58:45 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #26042: [Go SDK] Spanner Batched Reads

lostluck commented on code in PR #26042:
URL: https://github.com/apache/beam/pull/26042#discussion_r1156128691


##########
sdks/go/pkg/beam/io/spannerio/common.go:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resouces to
+// Google Spanner datastores.
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	"context"
+	"fmt"
+)

Review Comment:
   Minor nit: Most of the project ends up also running [`goimports`](https://pkg.go.dev/golang.org/x/tools/cmd/goimports) as well, which separates the standard library imports from other imports into two blocks. Commenting since it's inconsistent with most of Beam Go.
   
   Definitely a minor personal choice, since I don't think we have any tooling that mandates this. (Though, the Github Action that checks that fails on an earlier step for this PR https://github.com/apache/beam/actions/runs/4589437759/jobs/8104324444?pr=26042, so it still might.)



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	database "cloud.google.com/go/spanner/admin/database/apiv1"
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+	"reflect"
+	"testing"
+)
+
+var (
+	integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+	if !*integrationTests {
+		t.Skip("Not running in integration test mode.")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	db := "projects/test-project/instances/test-instance/databases/test-database"
+
+	srv, srvCleanup := newServer(t)
+	defer srvCleanup()
+
+	client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+	if err != nil {
+		t.Fatalf("Unable to create fake client: %v", err)
+	}
+	defer cleanup()
+
+	populateSpanner(context.Background(), admin, db, client)
+
+	rows := QueryBatch(s, db, "SELECT * FROM TEST", reflect.TypeOf(TestDto{}))
+
+	ptest.RunAndValidate(t, p)
+	passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, db string, client *spanner.Client) error {
+	iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 FROM Test"})
+	defer iter.Stop()
+
+	if _, err := iter.Next(); err == nil {
+		return nil
+	}
+
+	op, err := admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
+		Database: db,
+		Statements: []string{`CREATE TABLE Test (
+					One STRING(20),
+					Two INT64,
+				) PRIMARY KEY (Two)`},
+	})
+
+	if err != nil {
+		return err
+	}
+
+	if err := op.Wait(context.Background()); err != nil {
+		return err
+	}
+
+	testRows := []TestDto{
+		{
+			One: "one",
+			Two: 1,
+		},
+		{
+			One: "one",
+			Two: 2,
+		},
+		{
+			One: "one",
+			Two: 3,
+		},
+		{
+			One: "one",
+			Two: 4,
+		},
+	}
+
+	var mutations []*spanner.Mutation
+	for _, m := range testRows {
+		mutation, err := spanner.InsertStruct("Test", m)
+		if err != nil {
+			return err
+		}
+
+		mutations = append(mutations, mutation)
+	}
+
+	_, err = client.Apply(context.Background(), mutations)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func TestMain(t *testing.M) {
+	flag.Parse()
+	t.Run()
+}

Review Comment:
   This should be calling [`ptest.Main`](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest#Main)



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions.go:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.

Review Comment:
   It's not recommended to have a packagedoc comment on every file. All the files need the license header, but it's best to only have the doc comment in a single file. Having it in common.go would be appropriate for this package.
   
   See https://google.github.io/styleguide/go/decisions#package-comments
   
   ```suggestion
   ```



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions_test.go:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	"context"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	"testing"
+)
+
+func TestGeneratePartitions(t *testing.T) {
+	database := "projects/fake-proj/instances/fake-instance/databases/fake-db-4-rows"
+
+	query := "SELECT * from Test"
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	fn := newGeneratePartitionsFn(database, query)
+	fn.generator = &partitionGeneratorStub{}
+
+	partitions := fn.generatePartitions(s)

Review Comment:
   I'll note that this manner of pipeline testing doesn't work well for Portable beam. This at best, works for the Direct runner, and no other runners (like, flink, prism, dataflow etc).



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions_test.go:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+
+package spannerio

Review Comment:
   ```suggestion
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	database "cloud.google.com/go/spanner/admin/database/apiv1"
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+	"reflect"
+	"testing"
+)
+
+var (
+	integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+	if !*integrationTests {
+		t.Skip("Not running in integration test mode.")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	db := "projects/test-project/instances/test-instance/databases/test-database"
+
+	srv, srvCleanup := newServer(t)
+	defer srvCleanup()
+
+	client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+	if err != nil {
+		t.Fatalf("Unable to create fake client: %v", err)
+	}
+	defer cleanup()
+
+	populateSpanner(context.Background(), admin, db, client)
+
+	rows := QueryBatch(s, db, "SELECT * FROM TEST", reflect.TypeOf(TestDto{}))
+
+	ptest.RunAndValidate(t, p)
+	passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, db string, client *spanner.Client) error {
+	iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 FROM Test"})
+	defer iter.Stop()
+
+	if _, err := iter.Next(); err == nil {
+		return nil
+	}

Review Comment:
   This section doesn't appear to be doing anything WRT "populating Spanner". If it's necessary, please explain why in a comment, and augment the returned error with this sort of detail as well. Otherwise, please remove.



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	database "cloud.google.com/go/spanner/admin/database/apiv1"
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+	"reflect"
+	"testing"
+)
+
+var (
+	integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+	if !*integrationTests {
+		t.Skip("Not running in integration test mode.")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	db := "projects/test-project/instances/test-instance/databases/test-database"
+
+	srv, srvCleanup := newServer(t)
+	defer srvCleanup()
+
+	client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+	if err != nil {
+		t.Fatalf("Unable to create fake client: %v", err)
+	}
+	defer cleanup()
+
+	populateSpanner(context.Background(), admin, db, client)
+
+	rows := QueryBatch(s, db, "SELECT * FROM TEST", reflect.TypeOf(TestDto{}))
+
+	ptest.RunAndValidate(t, p)
+	passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, db string, client *spanner.Client) error {
+	iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 FROM Test"})
+	defer iter.Stop()
+
+	if _, err := iter.Next(); err == nil {
+		return nil
+	}
+
+	op, err := admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
+		Database: db,
+		Statements: []string{`CREATE TABLE Test (
+					One STRING(20),
+					Two INT64,
+				) PRIMARY KEY (Two)`},
+	})
+
+	if err != nil {
+		return err
+	}
+
+	if err := op.Wait(context.Background()); err != nil {

Review Comment:
   A context is being passed in, use that instead of creating a new one or comment why the passed in context is being ignored.



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	database "cloud.google.com/go/spanner/admin/database/apiv1"
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+	"reflect"
+	"testing"
+)
+
+var (
+	integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+	if !*integrationTests {
+		t.Skip("Not running in integration test mode.")
+	}

Review Comment:
   Please put integration tests in an appropriate directory in https://github.com/apache/beam/tree/master/sdks/go/test/integration/io and use the existing infrastructure for integration tests. This will allow the test to run on several runners.
   
   Although, I'll note that as written, this test won't work as it's using a fake client/server, like the other tests, but notably, the [spannertest fake doesn't support partitions at this time](https://pkg.go.dev/cloud.google.com/go/spanner/spannertest#section-readme)
   
   It would be important for any integration tests against real spanner would need to take care to clean up after itself so we don't waste beam project resources.



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	database "cloud.google.com/go/spanner/admin/database/apiv1"
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+	"reflect"
+	"testing"
+)
+
+var (
+	integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+	if !*integrationTests {
+		t.Skip("Not running in integration test mode.")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	db := "projects/test-project/instances/test-instance/databases/test-database"
+
+	srv, srvCleanup := newServer(t)
+	defer srvCleanup()
+
+	client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+	if err != nil {
+		t.Fatalf("Unable to create fake client: %v", err)
+	}
+	defer cleanup()
+
+	populateSpanner(context.Background(), admin, db, client)
+
+	rows := QueryBatch(s, db, "SELECT * FROM TEST", reflect.TypeOf(TestDto{}))
+
+	ptest.RunAndValidate(t, p)
+	passert.Count(s, rows, "Should have 4 rows", 4)

Review Comment:
   Having the count *after* the run and validate does nothing. The `passert`s are executed as part of a pipeline, and can't be added afterwards.



##########
sdks/go/pkg/beam/io/spannerio/write.go:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resouces to
+// Google Spanner datastores.
+package spannerio

Review Comment:
   ```suggestion
   
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read.go:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resouces to
+// Google Spanner datastores.
+package spannerio

Review Comment:
   ```suggestion
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	database "cloud.google.com/go/spanner/admin/database/apiv1"
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+	"reflect"
+	"testing"
+)
+
+var (
+	integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+	if !*integrationTests {
+		t.Skip("Not running in integration test mode.")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	db := "projects/test-project/instances/test-instance/databases/test-database"
+
+	srv, srvCleanup := newServer(t)
+	defer srvCleanup()
+
+	client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+	if err != nil {
+		t.Fatalf("Unable to create fake client: %v", err)
+	}
+	defer cleanup()
+
+	populateSpanner(context.Background(), admin, db, client)
+
+	rows := QueryBatch(s, db, "SELECT * FROM TEST", reflect.TypeOf(TestDto{}))
+
+	ptest.RunAndValidate(t, p)
+	passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, db string, client *spanner.Client) error {
+	iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 FROM Test"})
+	defer iter.Stop()
+
+	if _, err := iter.Next(); err == nil {
+		return nil
+	}
+
+	op, err := admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
+		Database: db,
+		Statements: []string{`CREATE TABLE Test (
+					One STRING(20),
+					Two INT64,
+				) PRIMARY KEY (Two)`},
+	})
+
+	if err != nil {
+		return err
+	}
+
+	if err := op.Wait(context.Background()); err != nil {
+		return err
+	}
+
+	testRows := []TestDto{
+		{
+			One: "one",
+			Two: 1,
+		},
+		{
+			One: "one",
+			Two: 2,
+		},
+		{
+			One: "one",
+			Two: 3,
+		},
+		{
+			One: "one",
+			Two: 4,
+		},
+	}
+
+	var mutations []*spanner.Mutation
+	for _, m := range testRows {
+		mutation, err := spanner.InsertStruct("Test", m)
+		if err != nil {
+			return err
+		}
+
+		mutations = append(mutations, mutation)
+	}
+
+	_, err = client.Apply(context.Background(), mutations)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func TestMain(t *testing.M) {
+	flag.Parse()
+	t.Run()
+}

Review Comment:
   This should be calling [`ptest.Main`](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest#Main) instead, which ensures test pipelines can be executed properly on arbitrary runners. 
   
   ```suggestion
   func TestMain(t *testing.M) {
   	ptest.Main(m)
   }
   ```



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions.go:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	"context"
+	"fmt"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn3x1[context.Context, []byte, func(read *PartitionedRead), error]((*generatePartitionsFn)(nil))
+	register.Emitter1[*PartitionedRead]()
+}
+
+type generatePartitionsFn struct {
+	spannerFn
+	Query     string            `json:"query"`   // Table is the table identifier.
+	Options   queryBatchOptions `json:"options"` // Options specifies additional query execution options.
+	generator partitionGenerator
+}
+
+func (f *generatePartitionsFn) Setup(ctx context.Context) error {
+	err := f.spannerFn.Setup(ctx)
+
+	if f.generator == nil {
+		f.generator = newPartitionGenerator(f.client)
+	}
+
+	return err
+}
+
+func (f *generatePartitionsFn) Teardown() {
+	f.spannerFn.Teardown()
+}
+
+func partitionOptions(options queryBatchOptions) spanner.PartitionOptions {
+	partitionOptions := spanner.PartitionOptions{}
+
+	if options.MaxPartitions != 0 {
+		partitionOptions.MaxPartitions = options.MaxPartitions
+	}
+
+	return partitionOptions
+}
+
+// GeneratePartitions generates read partitions to support batched reading from Spanner.
+func GeneratePartitions(s beam.Scope, db string, query string, options ...func(*queryBatchOptions) error) beam.PCollection {
+	s.Scope("spanner.GeneratePartitions")
+
+	fn := newGeneratePartitionsFn(db, query, options...)
+	return fn.generatePartitions(s)
+}
+
+func newGeneratePartitionsFn(
+	db string,
+	query string,
+	options ...func(*queryBatchOptions) error,
+) *generatePartitionsFn {
+	if db == "" {
+		panic("no database provided")
+	}
+
+	opts := queryBatchOptions{}
+	for _, opt := range options {
+		if err := opt(&opts); err != nil {
+			panic(err)
+		}
+	}
+
+	return &generatePartitionsFn{
+		spannerFn: newSpannerFn(db),
+		Query:     query,
+		Options:   opts,
+	}
+}
+
+func (f *generatePartitionsFn) generatePartitions(s beam.Scope) beam.PCollection {
+	imp := beam.Impulse(s)
+	return beam.ParDo(s, f, imp)
+}
+
+func (f *generatePartitionsFn) ProcessElement(ctx context.Context, _ []byte, emit func(*PartitionedRead)) error {
+	txnId, partitions := f.generator.generate(ctx, f.Options.TimestampBound, f.Query, partitionOptions(f.Options))
+
+	for _, p := range partitions {
+		emit(NewPartitionedRead(txnId, p))
+	}
+
+	return nil
+}
+
+type partitionGenerator interface {
+	generate(
+		ctx context.Context,
+		tb spanner.TimestampBound,
+		query string,
+		opts spanner.PartitionOptions,
+	) (spanner.BatchReadOnlyTransactionID, []*spanner.Partition)
+}
+
+type partitionGeneratorImpl struct {
+	client *spanner.Client
+}
+
+func newPartitionGenerator(client *spanner.Client) partitionGenerator {
+	return &partitionGeneratorImpl{client}
+}
+
+func (g *partitionGeneratorImpl) generate(
+	ctx context.Context,
+	tb spanner.TimestampBound,
+	query string,
+	opts spanner.PartitionOptions,
+) (spanner.BatchReadOnlyTransactionID, []*spanner.Partition) {
+	txn, err := g.client.BatchReadOnlyTransaction(ctx, tb)
+	if err != nil {
+		panic("unable to create batch read only transaction: " + err.Error())
+	}
+	defer txn.Close()
+
+	if txn == nil {
+		panic("unable to create a spanner transaction")
+	}

Review Comment:
   I'd remove this check, as it's simply untestable code.
   
   If the construction method `BatchReadOnlyTransaction` doesn't return an error and doesn't document that it can return a nil, its convention to assume that it's not nil, so it's not necessary to check here. 
   
   Further, if the call to PartitionQuery was called with a nil transaction, some internal detail would then fail with a panic, which would lead us back here anyway, so while it short cuts that investigation somewhat, it also shouldn't be happening in the first place.



##########
sdks/go/pkg/beam/io/spannerio/test_utils.go:
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	db "cloud.google.com/go/spanner/admin/database/apiv1"
+	"cloud.google.com/go/spanner/spannertest"
+	"context"
+	"google.golang.org/api/option"
+	"google.golang.org/grpc"
+	"testing"
+)
+
+type TestDto struct {
+	One string `spanner:"One"`
+	Two int64  `spanner:"Two"`
+}
+
+func newServer(t *testing.T) (*spannertest.Server, func()) {
+	srv, err := spannertest.NewServer("localhost:0")
+	if err != nil {
+		t.Fatalf("Starting in-memory fake spanner: %v", err)
+	}
+
+	return srv, func() {
+		srv.Close()
+	}
+}
+
+func createFakeClient(address string, database string) (*spanner.Client, *db.DatabaseAdminClient, func(), error) {

Review Comment:
   We can simplify things for the testing since Spanner has an env variable override we can use.
   
   See `SPANNER_EMULATOR_HOST` in the spannertest docs.
   https://pkg.go.dev/cloud.google.com/go/spanner/spannertest#section-readme
   
   This lets us use the normal client paths, and may be able to work better on portable runners at least for unit test purposes.
   
   Alternatively, we add another exported field to the spannerFn to put in the server address, and when that's present, we provide the appropriate connection option for the fake override. While this does move some test specific code into the production code, it makes testing much easier.



##########
sdks/go/pkg/beam/io/spannerio/read_batch.go:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+package spannerio

Review Comment:
   ```suggestion
   
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+	"cloud.google.com/go/spanner"
+	database "cloud.google.com/go/spanner/admin/database/apiv1"
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+	adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+	"reflect"
+	"testing"
+)
+
+var (
+	integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+	if !*integrationTests {
+		t.Skip("Not running in integration test mode.")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	db := "projects/test-project/instances/test-instance/databases/test-database"
+
+	srv, srvCleanup := newServer(t)
+	defer srvCleanup()
+
+	client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+	if err != nil {
+		t.Fatalf("Unable to create fake client: %v", err)
+	}
+	defer cleanup()
+
+	populateSpanner(context.Background(), admin, db, client)
+
+	rows := QueryBatch(s, db, "SELECT * FROM TEST", reflect.TypeOf(TestDto{}))
+
+	ptest.RunAndValidate(t, p)
+	passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, db string, client *spanner.Client) error {

Review Comment:
   Since this is part of test set up it should take in a `*testing.T` and call `t.Helper()`. All errors should then be logged with the `t.Error` or `t.Fatal` etc, assuming they block execution of the test. 
   
   As it stands, all the errors are ignored, which would make debugging difficult.
   
   See https://google.github.io/styleguide/go/decisions#test-helpers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org