You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/18 18:18:17 UTC
[beam] branch master updated: Fix a bug: certain error in
LiftedCombine is not propagated out (#7518)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 18a21e0 Fix a bug: certain error in LiftedCombine is not propagated out (#7518)
18a21e0 is described below
commit 18a21e0eeb302825341c5da4a0030e2ae760481e
Author: Tianyang Hu <ht...@gmail.com>
AuthorDate: Fri Jan 18 10:18:09 2019 -0800
Fix a bug: certain error in LiftedCombine is not propagated out (#7518)
* Fix a bug: certain error in LiftedCombine.FinishBundle is not propagated out
---
sdks/go/pkg/beam/core/runtime/exec/combine.go | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index b9a6eb6..067f852 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -319,13 +319,12 @@ func (n *LiftedCombine) FinishBundle(ctx context.Context) error {
// Need to run n.Out.ProcessElement for all the cached precombined KVs, and
// then finally Finish bundle as normal.
for _, a := range n.cache {
- n.Out.ProcessElement(ctx, a)
+ if err := n.Out.ProcessElement(ctx, a); err != nil {
+ return err
+ }
}
- if err := n.Out.FinishBundle(ctx); err != nil {
- return n.fail(err)
- }
- return nil
+ return n.Out.FinishBundle(ctx)
}
// Down tears down the cache.