You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/21 20:52:16 UTC
[beam] branch master updated: [BEAM-9789] Fix lock error. Add test.
(#11468)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6e6ff61 [BEAM-9789] Fix lock error. Add test. (#11468)
6e6ff61 is described below
commit 6e6ff6157904498ceeca4931648168d793721a6e
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 21 13:52:03 2020 -0700
[BEAM-9789] Fix lock error. Add test. (#11468)
---
sdks/go/pkg/beam/core/runtime/harness/harness.go | 13 +-
.../pkg/beam/core/runtime/harness/harness_test.go | 165 +++++++++++++++++++++
2 files changed, 173 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 9d35a0e..26cd02e 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -57,7 +57,9 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
client := fnpb.NewBeamFnControlClient(conn)
lookupDesc := func(id bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) {
- return client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+ pbd, err := client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+ log.Debugf(ctx, "GPBD RESP [%v]: %v, err %v", id, pbd, err)
+ return pbd, err
}
stub, err := client.Control(ctx)
@@ -170,18 +172,19 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) {
} else {
desc, ok := c.descriptors[bdID]
if !ok {
- c.mu.Unlock()
+ c.mu.Unlock() // Unlock to make the lookup.
newDesc, err := c.lookupDesc(bdID)
- c.mu.Lock()
if err != nil {
- return nil, fmt.Errorf("execution plan for %v not found: %v", bdID, err)
+ return nil, errors.Wrapf(err, "execution plan for %v not found", bdID)
}
+ c.mu.Lock()
c.descriptors[bdID] = newDesc
desc = newDesc
}
newPlan, err := exec.UnmarshalPlan(desc)
if err != nil {
- return nil, fmt.Errorf("Invalid bundle desc: %v", err)
+ c.mu.Unlock()
+ return nil, errors.Wrapf(err, "invalid bundle desc %v: %v", bdID, desc)
}
plan = newPlan
}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness_test.go b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
new file mode 100644
index 0000000..3afde8e
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness_test.go
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harness
+
+import (
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+ fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "github.com/golang/protobuf/proto"
+)
+
+// validDescriptor describes a valid pipeline with a source and a sink, but doesn't do anything else.
+func validDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor {
+ t.Helper()
+ port := &fnpb.RemoteGrpcPort{
+ CoderId: "c1",
+ ApiServiceDescriptor: &pipepb.ApiServiceDescriptor{
+ Url: "hostname:port",
+ },
+ }
+ portBytes, err := proto.Marshal(port)
+ if err != nil {
+ t.Fatalf("bad port: %v", err)
+ }
+ return &fnpb.ProcessBundleDescriptor{
+ Id: "test",
+ Transforms: map[string]*pipepb.PTransform{
+ "source": &pipepb.PTransform{
+ Spec: &pipepb.FunctionSpec{
+ Urn: "beam:runner:source:v1",
+ Payload: portBytes,
+ },
+ Outputs: map[string]string{
+ "o1": "p1",
+ },
+ },
+ "sink": &pipepb.PTransform{
+ Spec: &pipepb.FunctionSpec{
+ Urn: "beam:runner:sink:v1",
+ Payload: portBytes,
+ },
+ Inputs: map[string]string{
+ "i1": "p1",
+ },
+ },
+ },
+ Pcollections: map[string]*pipepb.PCollection{
+ "p1": &pipepb.PCollection{
+ CoderId: "c1",
+ },
+ },
+ Coders: map[string]*pipepb.Coder{
+ "c1": &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: "beam:coder:windowed_value:v1",
+ },
+ ComponentCoderIds: []string{"c2", "c3"},
+ },
+ "c2": &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: "beam:coder:varint:v1",
+ },
+ },
+ "c3": &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: "beam:coder:global_window:v1",
+ },
+ },
+ },
+ }
+
+}
+
+func invalidDescriptor(t *testing.T) *fnpb.ProcessBundleDescriptor {
+ return &fnpb.ProcessBundleDescriptor{}
+}
+
+func TestControl_getOrCreatePlan(t *testing.T) {
+ testBDID := bundleDescriptorID("test")
+ testPlan, err := exec.UnmarshalPlan(validDescriptor(t))
+ if err != nil {
+ t.Fatal("bad testPlan")
+ }
+ tests := []struct {
+ name string
+ lookupErr, planErr error
+ lookupDesc *fnpb.ProcessBundleDescriptor
+ descriptors map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor
+ plans map[bundleDescriptorID][]*exec.Plan
+ }{
+ {
+ name: "OK",
+ lookupDesc: validDescriptor(t),
+ }, {
+ name: "cachedDescriptor",
+ descriptors: map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor{
+ testBDID: validDescriptor(t),
+ },
+ }, {
+ name: "cachedPlan",
+ plans: map[bundleDescriptorID][]*exec.Plan{
+ testBDID: []*exec.Plan{testPlan},
+ },
+ }, {
+ name: "badLookup",
+ lookupErr: fmt.Errorf("lookupError"),
+ }, {
+ name: "badDescriptorPlan",
+ lookupDesc: invalidDescriptor(t),
+ planErr: fmt.Errorf("invalid bundle desc"),
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ ctrl := &control{
+ lookupDesc: func(bdID bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) {
+ return test.lookupDesc, test.lookupErr
+ },
+ descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor),
+ plans: make(map[bundleDescriptorID][]*exec.Plan),
+ active: make(map[instructionID]*exec.Plan),
+ failed: make(map[instructionID]error),
+ }
+ if test.descriptors != nil {
+ ctrl.descriptors = test.descriptors
+ }
+ if test.plans != nil {
+ ctrl.plans = test.plans
+ }
+ if test.planErr == nil {
+ test.planErr = test.lookupErr
+ }
+
+ plan, err := ctrl.getOrCreatePlan(testBDID)
+ if err != nil {
+ if plan != nil {
+ t.Error("getOrCreatePlan returned a non-nil error and non-nil plan. Non-nil errors must have nil plans.")
+ }
+ if got, want := err.Error(), test.planErr.Error(); !strings.Contains(got, want) {
+ t.Errorf("getOrCreatePlan errored: got %q, want to contain %q", got, want)
+ }
+ }
+
+ })
+ }
+
+}