You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/04/11 16:11:36 UTC

[beam] branch master updated: [BEAM-13898] Add tests to the pubsubx package. (#17324)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac3d6f84761 [BEAM-13898] Add tests to the pubsubx package. (#17324)
ac3d6f84761 is described below

commit ac3d6f847618ad157397770bcbb3efefc78dd921
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Mon Apr 11 09:11:29 2022 -0700

    [BEAM-13898] Add tests to the pubsubx package. (#17324)
---
 sdks/go/pkg/beam/util/pubsubx/pubsub.go      |  11 +-
 sdks/go/pkg/beam/util/pubsubx/pubsub_test.go | 187 +++++++++++++++++++++++++++
 2 files changed, 197 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/util/pubsubx/pubsub.go b/sdks/go/pkg/beam/util/pubsubx/pubsub.go
index b7769a9a581..b64726c4590 100644
--- a/sdks/go/pkg/beam/util/pubsubx/pubsub.go
+++ b/sdks/go/pkg/beam/util/pubsubx/pubsub.go
@@ -75,6 +75,11 @@ func CleanupTopic(ctx context.Context, project, topic string) {
 	if err != nil {
 		log.Errorf(ctx, "Failed to delete topic %v: %v", topic, err)
 	}
+	defer client.Close()
+	cleanupTopic(ctx, client, topic)
+}
+
+func cleanupTopic(ctx context.Context, client *pubsub.Client, topic string) {
 	if err := client.Topic(topic).Delete(ctx); err != nil {
 		log.Errorf(ctx, "Failed to delete topic %v: %v", topic, err)
 	}
@@ -87,6 +92,11 @@ func Publish(ctx context.Context, project, topic string, messages ...string) (*p
 	if err != nil {
 		return nil, err
 	}
+	defer client.Close()
+	return publish(ctx, client, topic, messages...)
+}
+
+func publish(ctx context.Context, client *pubsub.Client, topic string, messages ...string) (*pubsub.Subscription, error) {
 	t, err := EnsureTopic(ctx, client, topic)
 	if err != nil {
 		return nil, err
@@ -99,7 +109,6 @@ func Publish(ctx context.Context, project, topic string, messages ...string) (*p
 	for _, msg := range messages {
 		m := &pubsub.Message{
 			Data: ([]byte)(msg),
-			// Attributes: ??
 		}
 		id, err := t.Publish(ctx, m).Get(ctx)
 		if err != nil {
diff --git a/sdks/go/pkg/beam/util/pubsubx/pubsub_test.go b/sdks/go/pkg/beam/util/pubsubx/pubsub_test.go
new file mode 100644
index 00000000000..a27860fc780
--- /dev/null
+++ b/sdks/go/pkg/beam/util/pubsubx/pubsub_test.go
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pubsubx
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"testing"
+
+	"cloud.google.com/go/pubsub"
+	"cloud.google.com/go/pubsub/pstest"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/api/option"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+)
+
+func TestMakeQualifiedTopicName(t *testing.T) {
+	project, topic := "foo", "bar"
+	got := MakeQualifiedTopicName(project, topic)
+	want := "projects/foo/topics/bar"
+	if got != want {
+		t.Errorf("MakeQualifiedTopicName(%q, %q) = %v, want %v", project, topic, got, want)
+	}
+}
+
+func TestMakeQualifiedSubscriptionName(t *testing.T) {
+	project, topic := "foo", "bar"
+	got := MakeQualifiedSubscriptionName(project, topic)
+	want := "projects/foo/subscriptions/bar"
+	if got != want {
+		t.Errorf("MakeQualifiedSubscriptionName(%q, %q) = %v, want %v", project, topic, got, want)
+	}
+}
+
+func createFakeServerClient(ctx context.Context, project string) (*pubsub.Client, func(), error) {
+	// Go with a ~1 MB buffer for the in memory connection.
+	srv := pstest.NewServer()
+	closer := func() {
+		srv.Close()
+	}
+
+	// Connect to the server using it.
+	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
+	if err != nil {
+		closer()
+		return nil, nil, fmt.Errorf("unable to connect to bufconn: %w", err)
+	}
+
+	// Use the connection when creating a pubsub client.
+	client, err := pubsub.NewClient(ctx, project, option.WithGRPCConn(conn))
+	if err != nil {
+		closer()
+		return nil, nil, fmt.Errorf("unable to create pubsub client with bufconn: %w", err)
+	}
+	return client, closer, nil
+}
+
+func initTestServer(t *testing.T) (context.Context, *pubsub.Client) {
+	t.Helper()
+	ctx := context.Background()
+	client, closer, err := createFakeServerClient(ctx, "project")
+	if err != nil {
+		t.Fatal(err)
+	}
+	t.Cleanup(closer)
+	return ctx, client
+}
+
+func TestEnsureTopic(t *testing.T) {
+	ctx, client := initTestServer(t)
+
+	topicName := "test_topic"
+
+	if _, err := EnsureTopic(ctx, client, topicName); err != nil {
+		t.Fatalf("EnsureTopic(%q) failed: unable to create topic: %v", topicName, err)
+	}
+	topic := client.Topic(topicName)
+	if ok, err := topic.Exists(ctx); err != nil {
+		t.Fatalf("topic.Exist() failed: %v", err)
+	} else if !ok {
+		t.Fatalf("topic.Exist() = false, want true: EnsureTopic(%q) didn't create a topic", topicName)
+	}
+
+	// Now cover the exists already path
+	if _, err := EnsureTopic(ctx, client, topicName); err != nil {
+		t.Fatalf("EnsureTopic(%q) failed: unable to find existing topic: %v", topicName, err)
+	}
+
+	if ok, err := topic.Exists(ctx); err != nil {
+		t.Fatalf("topic.Exist() failed: %v", err)
+	} else if !ok {
+		t.Fatalf("topic.Exist() = false, want true: EnsureTopic(%q) didn't return existing topic", topicName)
+	}
+}
+
+func TestCleanupTopic(t *testing.T) {
+	ctx, client := initTestServer(t)
+	topicName := "test_topic"
+	topic, err := client.CreateTopic(ctx, topicName)
+	if err != nil {
+		t.Fatalf("client.CreateTopic(%q) failed %v", topicName, err)
+	}
+
+	cleanupTopic(ctx, client, topicName)
+
+	if ok, err := topic.Exists(ctx); err != nil {
+		t.Fatalf("topic.Exist() failed: %v", err)
+	} else if ok {
+		t.Fatalf("topic.Exist() = true, want false: CleanupTopic(%q) didn't delete topic", topicName)
+	}
+}
+
+func TestEnsureSubscription(t *testing.T) {
+	ctx, client := initTestServer(t)
+
+	topicName, subName := "test_topic", "test_sub"
+
+	if _, err := EnsureSubscription(ctx, client, topicName, subName); grpc.Code(err) != codes.NotFound {
+		t.Fatalf("EnsureSubscription(%q,%q) failed: expected NotFound error (topic doesn't exist): %v", topicName, subName, err)
+	}
+
+	if _, err := client.CreateTopic(ctx, topicName); err != nil {
+		t.Fatalf("client.CreateTopic(%q) failed %v", topicName, err)
+	}
+
+	if _, err := EnsureSubscription(ctx, client, topicName, subName); err != nil {
+		t.Fatalf("EnsureSubscription(%q,%q) failed: unable error to create subscription: %v", topicName, subName, err)
+	}
+
+	sub := client.Subscription(subName)
+	if ok, err := sub.Exists(ctx); err != nil {
+		t.Fatalf("sub.Exist() failed: %v", err)
+	} else if !ok {
+		t.Fatalf("sub.Exist() = false, want true: EnsureTopic(%q) didn't create a topic", topicName)
+	}
+
+	// // Now cover the exists already path
+	if _, err := EnsureSubscription(ctx, client, topicName, subName); err != nil {
+		t.Fatalf("EnsureSubscription(%q,%q) failed: unable error to find existing subscription: %v", topicName, subName, err)
+	}
+	sub = client.Subscription(subName)
+	if ok, err := sub.Exists(ctx); err != nil {
+		t.Fatalf("sub.Exist() failed: %v", err)
+	} else if !ok {
+		t.Fatalf("sub.Exist() = false, want true: EnsureTopic(%q) didn't create a topic", topicName)
+	}
+}
+
+func TestPublish(t *testing.T) {
+	ctx, client := initTestServer(t)
+
+	topicName := "test_topic"
+
+	want := []string{"apple", "banana", "carrot"}
+	sub, err := publish(ctx, client, topicName, want...)
+	if err != nil {
+		t.Fatalf("publish failed: %v", err)
+	}
+	cctx, cancel := context.WithCancel(ctx)
+	var got []string
+	sub.Receive(cctx, func(_ context.Context, msg *pubsub.Message) {
+		got = append(got, string(msg.Data))
+		msg.Ack()
+		if len(got) >= len(want) {
+			cancel()
+		}
+	})
+	sort.Strings(got)
+	if d := cmp.Diff(want, got); d != "" {
+		t.Fatalf("publish failed: diff\n%v", d)
+	}
+}