You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/03 15:58:36 UTC

[GitHub] [beam] camphillips22 opened a new issue, #23963: [Feature Request]: Enable emitting to windowed PCollection from FinishBundle()

camphillips22 opened a new issue, #23963:
URL: https://github.com/apache/beam/issues/23963

   ### What would you like to happen?
   
   I originally asked this question on [StackOverflow](https://stackoverflow.com/questions/74028678/using-finishbundle-in-apache-beam-go-sdk), but it does appear that there's a gap between python/java functionality and Go. According to this [documentation](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/), if emitting data into a PCollection from `finish_bundle()`, you must use a `WindowedValue`. 
   
   This concept doesn't exist for the Go SDK and any data that's emitted from `FinishBundle()` is [emitted into the `SingleGlobalWindow`](https://github.com/apache/beam/blob/85b6b643bff1157fa68787ab9bd6b0afcd6c8bc6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L234). I see a [TODO comment](https://github.com/apache/beam/blob/85b6b643bff1157fa68787ab9bd6b0afcd6c8bc6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L125) in the `StartBundle()` method that refers to a resolved [Jira Issue BEAM-3303](https://issues.apache.org/jira/browse/BEAM-3303), but I don't see any existing Github Issues that address this gap.
   
   The below isn't a functioning example, but is essentially what I would like to be able to do in a streaming pipeline.
   ```golang
   func BatchRpcFn {
     client RpcClient
     bufferRequest *RpcRequest
   }
   
   func (f *BatchRpcFn) Setup(ctx context.Context) {
     // setup client
   }
   
   func (f *BatchRpcFn) ProcessBundle(ctx context.Context, id string, emit func(string, bool)) error {
     f.bufferRequest.Ids = append(f.bufferRequest.Ids, id)
     if len(f.bufferRequest.Ids) > bufferLimit {
       return f.performRequestAndEmit(ctx, emit)
     }
     return nil
   }
   
   func (f *BatchRpcFn) FinishBundle(ctx context.Context, emit func(string, bool)) error {
     return f.performRequestAndEmit(ctx, emit)
   }
   ```
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: sdk-go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on issue #23963: [Feature Request][Go SDK]: Enable emitting to windowed PCollection from FinishBundle()

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #23963:
URL: https://github.com/apache/beam/issues/23963#issuecomment-1312299776

   This is an unfortunate oversight.
   
   Right now the current SDK behavior is incorrect (a consequence of being heavily tested under Global Window batch, and this behavior added pre robust windowing). Basically what's going wrong is that we don't know a good event time for the subsequently emitted batch. In general, we can't* from a framework perspective, as it's user data specific. 
   
   The work around would be to use emitters of the style func(beam.EventTime,V) or func(beam.EventTime, K, V) in both ProcessElement and FinishBundle while also requesting the existing element's beam.EventTime. This, in combination with storing a reasonable event time for the batch, would be a workaround to the windowing issue. The windowing would also need to be downstream of this DoFn to ensure windows.
   
   ----
   
   So the SDK would need to prevent this bad behavior going forward (failing on strict modes for it, providing a descriptive warning and instructions for fixing it as well). The SDK analysis portion would need to allow a slight mismatch in emitters, so the StartBundle/FinishBundle emitters are required to have an explicit event time, and not have to match the ProcessElement emitter exactly. This forces setting explicit event times, allowing for the correct windowing behavior.
   
   The event time thing is the big sticking point since if the elements don't have an event time, that means any *subsequent* windowing will have issues as well. So to permit any existing broken code to continue to function, it would need to analyze if there's a down stream issue,
   
   Having a robust GroupIntoBatches transform would be able to avoid this (have the GIB transform do the grouping, and handling the event time management), since it would then handle this logic for you. This is already on my "sooner", rather than "later" list. (I can't be more precise than that).
   
   ----
   
   *Technically CombineFn's have the framework make a specific choice for it's equivalent behavior here (what is a Lifted CombineFn, but a DoFn with fancy batching after all?), but we simply use the most default strategy for beam.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on issue #23963: [Feature Request][Go SDK]: Enable emitting to windowed PCollection from FinishBundle()

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #23963:
URL: https://github.com/apache/beam/issues/23963#issuecomment-1312300297

   Intentionally lableing this with bug and new feature for now, because the SDK can't do this, and what it has is entirely incorrect, but it's not simply a 'fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org