You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "cbix (via GitHub)" <gi...@apache.org> on 2023/01/25 00:47:36 UTC

[GitHub] [beam] cbix opened a new issue, #25154: [Bug]: runtime panic when using stateful DoFn

cbix opened a new issue, #25154:
URL: https://github.com/apache/beam/issues/25154

   ### What happened?
   
   With the Apache Beam Go SDK 2.44.0, having a stateful DoFn in your pipeline causes a panic at runtime with the go direct runner:
   ```
   panic: Failed to execute job: panic: runtime error: invalid memory address or nil pointer dereference
   ```
   <details><summary><b>Full trace</b></summary><pre>
   panic: Failed to execute job: panic: runtime error: invalid memory address or nil pointer dereference
   Full error:
   while executing Process for Plan[plan]:
   5: Impulse[0]
   1: ParDo[main.logFn] Out:[] Sig: func(context.Context, string)
   2: ParDo[main.valueStateFn] Out:[1] Sig: func(state.Provider, string, int) string
   3: ParDo[main.main.func1] Out:[2] Sig: func(string, func(string, int))
   4: ParDo[beam.createFn] Out:[3] Sig: func([]uint8, func(typex.T)) error
   	caused by:
   panic: runtime error: invalid memory address or nil pointer dereference goroutine 1 [running]:
   runtime/debug.Stack()
   	/usr/lib/go/src/runtime/debug/stack.go:24 +0x65
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5
   panic({0xece0c0, 0x18b11f0})
   	/usr/lib/go/src/runtime/panic.go:884 +0x212
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554140, {0x11afcf0?, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn.go:196 +0x519
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0700, {0x11afcf0, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:359 +0x25e
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0700, 0xc000162480)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:178 +0x9b
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x10?, 0x11afcf0?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x65
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0700, {0x5?, 0xc00056bf90?}, 0xc0000f2dd8, {0x0, 0x0, 0x0})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:141 +0x125
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeStringInt(0xc0000f2d80, {0xc0004908e0?, 0x4?}, 0xc0003de488?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/optimized/emitters.go:2681 +0x1bc
   main.main.func1({0xc0004908e0?, 0x0?}, 0x0?)
   	/home/florian/enlyze/beam/state.go:45 +0x26
   reflect.Value.call({0xe6e3a0?, 0x1092b08?, 0x450cf2?}, {0x102eaf6, 0x4}, {0xc0001618f0, 0x2, 0x50ec91?})
   	/usr/lib/go/src/reflect/value.go:584 +0x8c5
   reflect.Value.Call({0xe6e3a0?, 0x1092b08?, 0x10000004efe25?}, {0xc0001618f0?, 0x0?, 0x7fb4423aa5b8?})
   	/usr/lib/go/src/reflect/value.go:368 +0xbc
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xc0006048b8, {0xc00061cc80?, 0x98?, 0x18d1aa0?})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/util/reflectx/call.go:87 +0x59
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func46({0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn_arity.go:307 +0x94
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554280, {0x11afcf0?, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn.go:252 +0xe23
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0800, {0x11afcf0, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:359 +0x25e
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0800, 0xc0001623f0)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:178 +0x9b
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18d1aa0?, 0x11afcf0?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x65
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0800, {0xc0001618c0?, 0xe4a860?}, 0xc0000f2e98, {0x0, 0x0, 0x0})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:141 +0x125
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeTypex_T(0xc0000f2e40, {0xe4a860?, 0xc000040c20?})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/optimized/emitters.go:12121 +0x18a
   github.com/apache/beam/sdks/v2/go/pkg/beam.(*createFn).ProcessElement(0xc0001608d0, {0x0?, 0x8?, 0x40f35f?}, 0xc000040bf0)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/create.go:121 +0x105
   github.com/apache/beam/sdks/v2/go/pkg/beam.wrapMakerCreateFn.func1({0x19016d0?, 0x1000000000000?, 0x7fb4423b4b28?}, 0x0?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/beam.shims.go:70 +0x31
   github.com/apache/beam/sdks/v2/go/pkg/beam.(*callerSliceOfByteEmitTypex۰TГError).Call2x1(0x203000?, {0xe34060?, 0xc000604c30?}, {0xe47960?, 0xc000040bf0?})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/beam.shims.go:149 +0x5e
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func12({0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn_arity.go:103 +0x9b
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc0005543c0, {0x11afcf0?, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn.go:252 +0xe23
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0900, {0x11afcf0, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:359 +0x25e
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0900, 0xc000162360)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:178 +0x9b
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18?, 0x11afcf0?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x65
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0900, {0x0?, 0x0?}, 0xc0000e09a0, {0x0, 0x0, 0x0})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:141 +0x125
   github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.(*Impulse).Process(0xc000161800, {0x11afcf0, 0xc0001256c0})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/runners/direct/impulse.go:52 +0xee
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11afcf0?, 0xc0001256c0?}, 0xc0003dfb10?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/util.go:62 +0x6c
   github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0000e0930, {0x11afcf0, 0xc0001256c0}, {0x0, 0x0}, {{0x0?, 0x0?}, {0x0?, 0x0?}})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/plan.go:129 +0x394
   github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.Execute({0x11afb68?, 0xc000046128?}, 0xc000600218)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/runners/direct/direct.go:71 +0x89f
   github.com/apache/beam/sdks/v2/go/pkg/beam.Run({0x11afb68, 0xc000046128}, {0x10308b4, 0x6}, 0x1?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/runner.go:50 +0xc2
   github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx.Run({0x11afb68?, 0xc000046128?}, 0xe5e920?)
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/x/beamx/run.go:57 +0x4d
   main.main()
   	/home/florian/enlyze/beam/state.go:51 +0x2ec
   
   
   goroutine 1 [running]:
   github.com/apache/beam/sdks/v2/go/pkg/beam/log.Fatalf({0x11afb68, 0xc000046128}, {0x1042358?, 0x1092b00?}, {0xc0003dff00?, 0x0?, 0x0?})
   	/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/log/log.go:153 +0xa5
   main.main()
   	/home/florian/enlyze/beam/state.go:52 +0x33e
   exit status 2
   </pre></details>
   
   This leaves me wondering if state is even supported at all in the go direct runner.
   
   The code to reproduce this is based on the [`primitives/state` integration test `ValueStateParDo`](https://github.com/apache/beam/blob/a96afe2c57c45a869a622086eaa4f81305f06e72/sdks/go/test/integration/primitives/state.go#L49-L93):
   ```go
   package main
   
   import (
   	"context"
   	"flag"
   	"fmt"
   
   	"github.com/apache/beam/sdks/v2/go/pkg/beam"
   	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
   	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
   	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
   )
   
   func logFn(ctx context.Context, l string) {
   	log.Infoln(ctx, l)
   }
   
   type valueStateFn struct {
   	State1 state.Value[int]
   }
   
   func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) string {
   	i, ok, err := f.State1.Read(s)
   	if err != nil {
   		panic(err)
   	}
   	if !ok {
   		i = 1
   	}
   	err = f.State1.Write(s, i+1)
   	if err != nil {
   		panic(err)
   	}
   	return fmt.Sprintf("%s: %v", w, i)
   }
   
   func main() {
   	flag.Parse()
   	beam.Init()
   
   	p, s := beam.NewPipelineWithRoot()
   
   	in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
   	keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
   		emit(w, 1)
   	}, in)
   	counts := beam.ParDo(s, &valueStateFn{}, keyed)
   	beam.ParDo0(s, logFn, counts)
   
   	ctx := context.Background()
   	if err := beamx.Run(ctx, p); err != nil {
   		log.Fatalf(ctx, "Failed to execute job: %v", err)
   	}
   }
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [X] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse closed issue #25154: [Bug]: Go direct runner panics when using stateful DoFn

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse closed issue #25154: [Bug]: Go direct runner panics when using stateful DoFn
URL: https://github.com/apache/beam/issues/25154


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on issue #25154: [Bug]: Go direct runner panics when using stateful DoFn

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on issue #25154:
URL: https://github.com/apache/beam/issues/25154#issuecomment-1411052239

   +1 this is WAI. The [upcoming replacement runner](#24789) will be a better place to implement this. The Go Direct Runner doesn't implement RPC service protocols, which means it largely duplicates a lot of effort to maintain.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on issue #25154: [Bug]: Go direct runner panics when using stateful DoFn

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on issue #25154:
URL: https://github.com/apache/beam/issues/25154#issuecomment-1405137290

   You're right. User state is not supported in Direct Runner. The integrations tests on direct runner are skipped [here].(https://github.com/apache/beam/blob/76f3a5919fe6bc31f9117051aa6c4aeb4e39e017/sdks/go/test/integration/integration.go#L93)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org