You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "yelianevich (via GitHub)" <gi...@apache.org> on 2024/02/06 14:29:24 UTC

Re: [I] [Bug]: Side Input Singleton View throw error when impulse period is short: PCollection with more than one element accessed as a singleton view [beam]

yelianevich commented on issue #26465:
URL: https://github.com/apache/beam/issues/26465#issuecomment-1929855207

   @ballooncross Looking at your code I can suggest an alternative approach by using a custom global combiner that will eliminate subsequent `Latest.globally()` and `View.asSingleton()` and will do everything in one step.
   
   ```
   PCollectionView<Map<String, String>> map = pipeline.apply("Impulse", 
                                                GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                   .apply(Window.<Map<String, String>>into(new GlobalWindows())
                           .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                           .discardingFiredPanes())
                   .apply(Combine.globally(new MaxImpulseFn<>(functionToRead, coderOfFunctionOutput))
                        .withoutDefaults()
                        .asSingletonView());
   ```
   `MaxImpulseFn` is a custom combiner (even though quite simple) that finds the max impulse and reads the external data in `extractOutput`
   ```
   private static class MaxImpulseFn<T> extends CombineWithContext.CombineFnWithContext<Long, Long, T> {
   
          @Override
           public T extractOutput(Long accumulator, Context context) {
               return functionToRead.apply(accumulator, context.getPipelineOptions());
           }
   
           @Override
           public Coder<T> getDefaultOutputCoder(CoderRegistry registry, Coder<Long> inputCoder) {
               return coderOfFunctionOutput;
           }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org