You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/21 20:52:16 UTC

[beam] branch master updated: [BEAM-9789] Fix lock error. Add test. (#11468)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e6ff61  [BEAM-9789] Fix lock error. Add test. (#11468)
6e6ff61 is described below

commit 6e6ff6157904498ceeca4931648168d793721a6e
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 21 13:52:03 2020 -0700

    [BEAM-9789] Fix lock error. Add test. (#11468)
---
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  13 +-
 .../pkg/beam/core/runtime/harness/harness_test.go  | 165 +++++++++++++++++++++
 2 files changed, 173 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 9d35a0e..26cd02e 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -57,7 +57,9 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 	client := fnpb.NewBeamFnControlClient(conn)
 
 	lookupDesc := func(id bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) {
-		return client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+		pbd, err := client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+		log.Debugf(ctx, "GPBD RESP [%v]: %v, err %v", id, pbd, err)
+		return pbd, err
 	}
 
 	stub, err := client.Control(ctx)
@@ -170,18 +172,19 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) {
 	} else {
 		desc, ok := c.descriptors[bdID]
 		if !ok {
-			c.mu.Unlock()
+			c.mu.Unlock() // Unlock to make the lookup.
 			newDesc, err := c.lookupDesc(bdID)
-			c.mu.Lock()
 			if err != nil {
-				return nil, fmt.Errorf("execution plan for %v not found: %v", bdID, err)
+				return nil, errors.Wrapf(err, "execution plan for %v not found", bdID)
 			}
+			c.mu.Lock()
 			c.descriptors[bdID] = newDesc
 			desc = newDesc
 		}
 		newPlan, err := exec.UnmarshalPlan(desc)
 		if err != nil {
-			return nil, fmt.Errorf("Invalid bundle desc: %v", err)
+			c.mu.Unlock()
+			return nil, errors.Wrapf(err, "invalid bundle desc %v: %v", bdID, desc)
 		}
 		plan = newPlan
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
new file mode 100644
index 0000000..3afde8e
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harness
+
+import (
+	"fmt"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/golang/protobuf/proto"
+)
+
+// validDescriptor describes a valid pipeline with a source and a sink, but doesn't do anything else.
+func validDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor {
+	t.Helper()
+	port := &fnpb.RemoteGrpcPort{
+		CoderId: "c1",
+		ApiServiceDescriptor: &pipepb.ApiServiceDescriptor{
+			Url: "hostname:port",
+		},
+	}
+	portBytes, err := proto.Marshal(port)
+	if err != nil {
+		t.Fatalf("bad port: %v", err)
+	}
+	return &fnpb.ProcessBundleDescriptor{
+		Id: "test",
+		Transforms: map[string]*pipepb.PTransform{
+			"source": &pipepb.PTransform{
+				Spec: &pipepb.FunctionSpec{
+					Urn:     "beam:runner:source:v1",
+					Payload: portBytes,
+				},
+				Outputs: map[string]string{
+					"o1": "p1",
+				},
+			},
+			"sink": &pipepb.PTransform{
+				Spec: &pipepb.FunctionSpec{
+					Urn:     "beam:runner:sink:v1",
+					Payload: portBytes,
+				},
+				Inputs: map[string]string{
+					"i1": "p1",
+				},
+			},
+		},
+		Pcollections: map[string]*pipepb.PCollection{
+			"p1": &pipepb.PCollection{
+				CoderId: "c1",
+			},
+		},
+		Coders: map[string]*pipepb.Coder{
+			"c1": &pipepb.Coder{
+				Spec: &pipepb.FunctionSpec{
+					Urn: "beam:coder:windowed_value:v1",
+				},
+				ComponentCoderIds: []string{"c2", "c3"},
+			},
+			"c2": &pipepb.Coder{
+				Spec: &pipepb.FunctionSpec{
+					Urn: "beam:coder:varint:v1",
+				},
+			},
+			"c3": &pipepb.Coder{
+				Spec: &pipepb.FunctionSpec{
+					Urn: "beam:coder:global_window:v1",
+				},
+			},
+		},
+	}
+
+}
+
+func invalidDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor {
+	return &fnpb.ProcessBundleDescriptor{}
+}
+
+func TestControl_getOrCreatePlan(t *testing.T) {
+	testBDID := bundleDescriptorID("test")
+	testPlan, err := exec.UnmarshalPlan(validDescriptor(t))
+	if err != nil {
+		t.Fatal("bad testPlan")
+	}
+	tests := []struct {
+		name               string
+		lookupErr, planErr error
+		lookupDesc         *fnpb.ProcessBundleDescriptor
+		descriptors        map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor
+		plans              map[bundleDescriptorID][]*exec.Plan
+	}{
+		{
+			name:       "OK",
+			lookupDesc: validDescriptor(t),
+		}, {
+			name: "cachedDescriptor",
+			descriptors: map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor{
+				testBDID: validDescriptor(t),
+			},
+		}, {
+			name: "cachedPlan",
+			plans: map[bundleDescriptorID][]*exec.Plan{
+				testBDID: []*exec.Plan{testPlan},
+			},
+		}, {
+			name:      "badLookup",
+			lookupErr: fmt.Errorf("lookupError"),
+		}, {
+			name:       "badDescriptorPlan",
+			lookupDesc: invalidDescriptor(t),
+			planErr:    fmt.Errorf("invalid bundle desc"),
+		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctrl := &control{
+				lookupDesc: func(bdID bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) {
+					return test.lookupDesc, test.lookupErr
+				},
+				descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor),
+				plans:       make(map[bundleDescriptorID][]*exec.Plan),
+				active:      make(map[instructionID]*exec.Plan),
+				failed:      make(map[instructionID]error),
+			}
+			if test.descriptors != nil {
+				ctrl.descriptors = test.descriptors
+			}
+			if test.plans != nil {
+				ctrl.plans = test.plans
+			}
+			if test.planErr == nil {
+				test.planErr = test.lookupErr
+			}
+
+			plan, err := ctrl.getOrCreatePlan(testBDID)
+			if err != nil {
+				if plan != nil {
+					t.Error("getOrCreatePlan returned a non-nil error and non-nil plan. Non-nil errors must have nil plans.")
+				}
+				if got, want := err.Error(), test.planErr.Error(); !strings.Contains(got, want) {
+					t.Errorf("getOrCreatePlan errored: got %q, want to contain %q", got, want)
+				}
+			}
+
+		})
+	}
+
+}