You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/11 20:11:28 UTC
[beam] 01/01: [Go SDK] Don't construct plans in lock section.
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch unblockLock23335
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 01d662e58a826cb39762eeff3faa58e08f10bd1e
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Tue Oct 11 13:09:27 2022 -0700
[Go SDK] Don't construct plans in lock section.
---
sdks/go/pkg/beam/core/runtime/harness/harness.go | 40 ++++++++++++------------
1 file changed, 20 insertions(+), 20 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index cddb1e07fa0..edc16578a7f 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -317,31 +317,31 @@ func (c *control) metStoreToString(statusInfo *strings.Builder) {
func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) {
c.mu.Lock()
plans, ok := c.plans[bdID]
- var plan *exec.Plan
+ // If we have a spare plan for this bdID, we're done.
+ // Remove it from the cache, and return it.
if ok && len(plans) > 0 {
- plan = plans[len(plans)-1]
+ plan := plans[len(plans)-1]
c.plans[bdID] = plans[:len(plans)-1]
- } else {
- desc, ok := c.descriptors[bdID]
- if !ok {
- c.mu.Unlock() // Unlock to make the lookup.
- newDesc, err := c.lookupDesc(bdID)
- if err != nil {
- return nil, errors.WithContextf(err, "execution plan for %v not found", bdID)
- }
- c.mu.Lock()
- c.descriptors[bdID] = newDesc
- desc = newDesc
- }
- newPlan, err := exec.UnmarshalPlan(desc)
+ c.mu.Unlock()
+ return plan, nil
+ }
+ desc, ok := c.descriptors[bdID]
+ c.mu.Unlock() // Unlock to make the lookup or build the descriptor.
+ if !ok {
+ newDesc, err := c.lookupDesc(bdID)
if err != nil {
- c.mu.Unlock()
- return nil, errors.WithContextf(err, "invalid bundle desc: %v\n%v\n", bdID, desc.String())
+ return nil, errors.WithContextf(err, "execution plan for %v not found", bdID)
}
- plan = newPlan
+ c.mu.Lock()
+ c.descriptors[bdID] = newDesc
+ c.mu.Unlock()
+ desc = newDesc
+ }
+ newPlan, err := exec.UnmarshalPlan(desc)
+ if err != nil {
+ return nil, errors.WithContextf(err, "invalid bundle desc: %v\n%v\n", bdID, desc.String())
}
- c.mu.Unlock()
- return plan, nil
+ return newPlan, nil
}
func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse {