You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/11 20:11:28 UTC

[beam] 01/01: [Go SDK] Don't construct plans in lock section.

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch unblockLock23335
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 01d662e58a826cb39762eeff3faa58e08f10bd1e
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Tue Oct 11 13:09:27 2022 -0700

    [Go SDK] Don't construct plans in lock section.
---
 sdks/go/pkg/beam/core/runtime/harness/harness.go | 40 ++++++++++++------------
 1 file changed, 20 insertions(+), 20 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index cddb1e07fa0..edc16578a7f 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -317,31 +317,31 @@ func (c *control) metStoreToString(statusInfo *strings.Builder) {
 func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) {
 	c.mu.Lock()
 	plans, ok := c.plans[bdID]
-	var plan *exec.Plan
+	// If we have a spare plan for this bdID, we're done.
+	// Remove it from the cache, and return it.
 	if ok && len(plans) > 0 {
-		plan = plans[len(plans)-1]
+		plan := plans[len(plans)-1]
 		c.plans[bdID] = plans[:len(plans)-1]
-	} else {
-		desc, ok := c.descriptors[bdID]
-		if !ok {
-			c.mu.Unlock() // Unlock to make the lookup.
-			newDesc, err := c.lookupDesc(bdID)
-			if err != nil {
-				return nil, errors.WithContextf(err, "execution plan for %v not found", bdID)
-			}
-			c.mu.Lock()
-			c.descriptors[bdID] = newDesc
-			desc = newDesc
-		}
-		newPlan, err := exec.UnmarshalPlan(desc)
+		c.mu.Unlock()
+		return plan, nil
+	}
+	desc, ok := c.descriptors[bdID]
+	c.mu.Unlock() // Unlock to make the lookup or build the descriptor.
+	if !ok {
+		newDesc, err := c.lookupDesc(bdID)
 		if err != nil {
-			c.mu.Unlock()
-			return nil, errors.WithContextf(err, "invalid bundle desc: %v\n%v\n", bdID, desc.String())
+			return nil, errors.WithContextf(err, "execution plan for %v not found", bdID)
 		}
-		plan = newPlan
+		c.mu.Lock()
+		c.descriptors[bdID] = newDesc
+		c.mu.Unlock()
+		desc = newDesc
+	}
+	newPlan, err := exec.UnmarshalPlan(desc)
+	if err != nil {
+		return nil, errors.WithContextf(err, "invalid bundle desc: %v\n%v\n", bdID, desc.String())
 	}
-	c.mu.Unlock()
-	return plan, nil
+	return newPlan, nil
 }
 
 func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse {