You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/18 18:18:17 UTC

[beam] branch master updated: Fix a bug: certain error in LiftedCombine is not propagated out (#7518)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 18a21e0  Fix a bug: certain error in LiftedCombine is not propagated out (#7518)
18a21e0 is described below

commit 18a21e0eeb302825341c5da4a0030e2ae760481e
Author: Tianyang Hu <ht...@gmail.com>
AuthorDate: Fri Jan 18 10:18:09 2019 -0800

    Fix a bug: certain error in LiftedCombine is not propagated out (#7518)
    
    * Fix a bug: certain error in LiftedCombine.FinishBundle is not propagated out
---
 sdks/go/pkg/beam/core/runtime/exec/combine.go | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index b9a6eb6..067f852 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -319,13 +319,12 @@ func (n *LiftedCombine) FinishBundle(ctx context.Context) error {
 	// Need to run n.Out.ProcessElement for all the cached precombined KVs, and
 	// then finally Finish bundle as normal.
 	for _, a := range n.cache {
-		n.Out.ProcessElement(ctx, a)
+		if err := n.Out.ProcessElement(ctx, a); err != nil {
+			return err
+		}
 	}
 
-	if err := n.Out.FinishBundle(ctx); err != nil {
-		return n.fail(err)
-	}
-	return nil
+	return n.Out.FinishBundle(ctx)
 }
 
 // Down tears down the cache.