You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "cbix (via GitHub)" <gi...@apache.org> on 2023/01/25 00:47:36 UTC
[GitHub] [beam] cbix opened a new issue, #25154: [Bug]: runtime panic when using stateful DoFn
cbix opened a new issue, #25154:
URL: https://github.com/apache/beam/issues/25154
### What happened?
With the Apache Beam Go SDK 2.44.0, having a stateful DoFn in your pipeline causes a panic at runtime with the go direct runner:
```
panic: Failed to execute job: panic: runtime error: invalid memory address or nil pointer dereference
```
<details><summary><b>Full trace</b></summary><pre>
panic: Failed to execute job: panic: runtime error: invalid memory address or nil pointer dereference
Full error:
while executing Process for Plan[plan]:
5: Impulse[0]
1: ParDo[main.logFn] Out:[] Sig: func(context.Context, string)
2: ParDo[main.valueStateFn] Out:[1] Sig: func(state.Provider, string, int) string
3: ParDo[main.main.func1] Out:[2] Sig: func(string, func(string, int))
4: ParDo[beam.createFn] Out:[3] Sig: func([]uint8, func(typex.T)) error
caused by:
panic: runtime error: invalid memory address or nil pointer dereference goroutine 1 [running]:
runtime/debug.Stack()
/usr/lib/go/src/runtime/debug/stack.go:24 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5
panic({0xece0c0, 0x18b11f0})
/usr/lib/go/src/runtime/panic.go:884 +0x212
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554140, {0x11afcf0?, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn.go:196 +0x519
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0700, {0x11afcf0, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:359 +0x25e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0700, 0xc000162480)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:178 +0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x10?, 0x11afcf0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0700, {0x5?, 0xc00056bf90?}, 0xc0000f2dd8, {0x0, 0x0, 0x0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:141 +0x125
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeStringInt(0xc0000f2d80, {0xc0004908e0?, 0x4?}, 0xc0003de488?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/optimized/emitters.go:2681 +0x1bc
main.main.func1({0xc0004908e0?, 0x0?}, 0x0?)
/home/florian/enlyze/beam/state.go:45 +0x26
reflect.Value.call({0xe6e3a0?, 0x1092b08?, 0x450cf2?}, {0x102eaf6, 0x4}, {0xc0001618f0, 0x2, 0x50ec91?})
/usr/lib/go/src/reflect/value.go:584 +0x8c5
reflect.Value.Call({0xe6e3a0?, 0x1092b08?, 0x10000004efe25?}, {0xc0001618f0?, 0x0?, 0x7fb4423aa5b8?})
/usr/lib/go/src/reflect/value.go:368 +0xbc
github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xc0006048b8, {0xc00061cc80?, 0x98?, 0x18d1aa0?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/util/reflectx/call.go:87 +0x59
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func46({0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn_arity.go:307 +0x94
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554280, {0x11afcf0?, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn.go:252 +0xe23
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0800, {0x11afcf0, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:359 +0x25e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0800, 0xc0001623f0)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:178 +0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18d1aa0?, 0x11afcf0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0800, {0xc0001618c0?, 0xe4a860?}, 0xc0000f2e98, {0x0, 0x0, 0x0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:141 +0x125
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeTypex_T(0xc0000f2e40, {0xe4a860?, 0xc000040c20?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/optimized/emitters.go:12121 +0x18a
github.com/apache/beam/sdks/v2/go/pkg/beam.(*createFn).ProcessElement(0xc0001608d0, {0x0?, 0x8?, 0x40f35f?}, 0xc000040bf0)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/create.go:121 +0x105
github.com/apache/beam/sdks/v2/go/pkg/beam.wrapMakerCreateFn.func1({0x19016d0?, 0x1000000000000?, 0x7fb4423b4b28?}, 0x0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/beam.shims.go:70 +0x31
github.com/apache/beam/sdks/v2/go/pkg/beam.(*callerSliceOfByteEmitTypex۰TГError).Call2x1(0x203000?, {0xe34060?, 0xc000604c30?}, {0xe47960?, 0xc000040bf0?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/beam.shims.go:149 +0x5e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func12({0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn_arity.go:103 +0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc0005543c0, {0x11afcf0?, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/fn.go:252 +0xe23
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0900, {0x11afcf0, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, ...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:359 +0x25e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0900, 0xc000162360)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:178 +0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18?, 0x11afcf0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:156 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0900, {0x0?, 0x0?}, 0xc0000e09a0, {0x0, 0x0, 0x0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/pardo.go:141 +0x125
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.(*Impulse).Process(0xc000161800, {0x11afcf0, 0xc0001256c0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/runners/direct/impulse.go:52 +0xee
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11afcf0?, 0xc0001256c0?}, 0xc0003dfb10?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/util.go:62 +0x6c
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0000e0930, {0x11afcf0, 0xc0001256c0}, {0x0, 0x0}, {{0x0?, 0x0?}, {0x0?, 0x0?}})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/core/runtime/exec/plan.go:129 +0x394
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.Execute({0x11afb68?, 0xc000046128?}, 0xc000600218)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/runners/direct/direct.go:71 +0x89f
github.com/apache/beam/sdks/v2/go/pkg/beam.Run({0x11afb68, 0xc000046128}, {0x10308b4, 0x6}, 0x1?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/runner.go:50 +0xc2
github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx.Run({0x11afb68?, 0xc000046128?}, 0xe5e920?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/x/beamx/run.go:57 +0x4d
main.main()
/home/florian/enlyze/beam/state.go:51 +0x2ec
goroutine 1 [running]:
github.com/apache/beam/sdks/v2/go/pkg/beam/log.Fatalf({0x11afb68, 0xc000046128}, {0x1042358?, 0x1092b00?}, {0xc0003dff00?, 0x0?, 0x0?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.44.0/go/pkg/beam/log/log.go:153 +0xa5
main.main()
/home/florian/enlyze/beam/state.go:52 +0x33e
exit status 2
</pre></details>
This leaves me wondering if state is even supported at all in the go direct runner.
The code to reproduce this is based on the [`primitives/state` integration test `ValueStateParDo`](https://github.com/apache/beam/blob/a96afe2c57c45a869a622086eaa4f81305f06e72/sdks/go/test/integration/primitives/state.go#L49-L93):
```go
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func logFn(ctx context.Context, l string) {
log.Infoln(ctx, l)
}
type valueStateFn struct {
State1 state.Value[int]
}
func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) string {
i, ok, err := f.State1.Read(s)
if err != nil {
panic(err)
}
if !ok {
i = 1
}
err = f.State1.Write(s, i+1)
if err != nil {
panic(err)
}
return fmt.Sprintf("%s: %v", w, i)
}
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
emit(w, 1)
}, in)
counts := beam.ParDo(s, &valueStateFn{}, keyed)
beam.ParDo0(s, logFn, counts)
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [X] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] riteshghorse closed issue #25154: [Bug]: Go direct runner panics when using stateful DoFn
Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse closed issue #25154: [Bug]: Go direct runner panics when using stateful DoFn
URL: https://github.com/apache/beam/issues/25154
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lostluck commented on issue #25154: [Bug]: Go direct runner panics when using stateful DoFn
Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on issue #25154:
URL: https://github.com/apache/beam/issues/25154#issuecomment-1411052239
+1 this is WAI. The [upcoming replacement runner](#24789) will be a better place to implement this. The Go Direct Runner doesn't implement RPC service protocols, which means it largely duplicates a lot of effort to maintain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] riteshghorse commented on issue #25154: [Bug]: Go direct runner panics when using stateful DoFn
Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on issue #25154:
URL: https://github.com/apache/beam/issues/25154#issuecomment-1405137290
You're right. User state is not supported in Direct Runner. The integrations tests on direct runner are skipped [here].(https://github.com/apache/beam/blob/76f3a5919fe6bc31f9117051aa6c4aeb4e39e017/sdks/go/test/integration/integration.go#L93)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org