You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/08/17 21:51:15 UTC

[beam] 01/01: Avoid panic on type assert.

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1fe85fe57d209cdc43df5b15a4c37f969550ae40
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Wed Aug 17 14:51:07 2022 -0700

    Avoid panic on type assert.
---
 sdks/go/pkg/beam/core/runtime/exec/translate.go | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index d9d56ddf65d..e5b9976cb34 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -528,11 +528,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 					u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0], WindowCoder: wc}
 				case urnPerKeyCombineMerge:
 					ma := &MergeAccumulators{Combine: cn}
-					if eo, ok := ma.Out.(*PCollection).Out.(*ExtractOutput); ok {
-						// Strip PCollections from between MergeAccumulators and ExtractOutputs
-						// as it's a synthetic PCollection.
-						b.units = b.units[:len(b.units)-1]
-						ma.Out = eo
+					if pc, ok := ma.Out.(*PCollection); ok {
+						if eo, ok := pc.Out.(*ExtractOutput); ok {
+							// Strip PCollections from between MergeAccumulators and ExtractOutputs
+							// as it's a synthetic PCollection.
+							b.units = b.units[:len(b.units)-1]
+							ma.Out = eo
+						}
 					}
 					u = ma
 				case urnPerKeyCombineExtract: