You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Tyson Hamilton (Jira)" <ji...@apache.org> on 2020/09/11 20:21:00 UTC

[jira] [Updated] (BEAM-10342) Processing Time Timer is not being invoked

     [ https://issues.apache.org/jira/browse/BEAM-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tyson Hamilton updated BEAM-10342:
----------------------------------
    Status: Resolved  (was: Open)

> Processing Time Timer is not being invoked
> ------------------------------------------
>
>                 Key: BEAM-10342
>                 URL: https://issues.apache.org/jira/browse/BEAM-10342
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: sailaxmankumar
>            Priority: P2
>
> When using state time domain to batch items for an external api
> We are trying to use the existing {{GroupIntoBatches}}. but we need some additional functionality to create batch every n milliseconds. So we are trying to add some functionality to this Transform by adding an additional {{Timer}} of {{TimeDomain.PROCESSING_TIME}}.
> {code:java}
> @TimerId(STALE_TIMER_ID)
>  private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> {code}
> When we are testing this Timer in Dataflow Runner, this is not outputting any new batches
> Updated GroupIntoBatches code:
> {code:java}
> package my.package;
> import static com.google.common.base.Preconditions.checkArgument;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.BagState;
> import org.apache.beam.sdk.state.CombiningState;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.transforms.Combine;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.Iterables;
> import lombok.RequiredArgsConstructor;
> import lombok.extern.slf4j.Slf4j;
> @Slf4j
> @RequiredArgsConstructor
> public class GroupIntoBatches<K, InputT>
>     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> {
>     private final long batchSize;
>     private final long staleTime;
>     public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, long staleTime) {
>         return new GroupIntoBatches<>(batchSize, staleTime);
>     }
>     @Override
>     public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> input) {
>         Duration allowedLateness = input.getWindowingStrategy().getAllowedLateness();
>         checkArgument(
>             input.getCoder() instanceof KvCoder,
>             "coder specified in the input PCollection is not a KvCoder");
>         KvCoder inputCoder = (KvCoder) input.getCoder();
>         Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
>         Coder<InputT> valueCoder = (Coder<InputT>) inputCoder.getCoderArguments().get(1);
>         return input.apply(
>             ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, allowedLateness, keyCoder, valueCoder)));
>     }
>     @VisibleForTesting
>     static class GroupIntoBatchesDoFn<K, InputT>
>         extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
>         private static final String STALE_TIMER_ID = "staleTimer";
>         private static final String END_OF_WINDOW_ID = "endOFWindow";
>         private static final String BATCH_ID = "batch";
>         private static final String NUM_ELEMENTS_IN_BATCH_ID = "numElementsInBatch";
>         private static final String KEY_ID = "key";
>         private final long batchSize;
>         private final long staleTime;
>         private final Duration allowedLateness;
>         @TimerId(END_OF_WINDOW_ID)
>         private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>         @TimerId(STALE_TIMER_ID)
>         private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>         @StateId(BATCH_ID)
>         private final StateSpec<BagState<InputT>> batchSpec;
>         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
>         private final StateSpec<CombiningState<Long, long[], Long>> numElementsInBatchSpec;
>         @StateId(KEY_ID)
>         private final StateSpec<ValueState<K>> keySpec;
>         private final long prefetchFrequency;
>         GroupIntoBatchesDoFn(
>             long batchSize,
>             long staleTime,
>             Duration allowedLateness,
>             Coder<K> inputKeyCoder,
>             Coder<InputT> inputValueCoder) {
>             this.batchSize = batchSize;
>             this.staleTime = staleTime;
>             this.allowedLateness = allowedLateness;
>             this.batchSpec = StateSpecs.bag(inputValueCoder);
>             this.numElementsInBatchSpec =
>                 StateSpecs.combining(
>                     new Combine.BinaryCombineLongFn() {
>                         @Override
>                         public long identity() {
>                             return 0L;
>                         }
>                         @Override
>                         public long apply(long left, long right) {
>                             return left + right;
>                         }
>                     });
>             this.keySpec = StateSpecs.value(inputKeyCoder);
>             // prefetch every 20% of batchSize elements. Do not prefetch if batchSize is too little
>             this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5);
>         }
>         @ProcessElement
>         public void processElement(
>             @TimerId(END_OF_WINDOW_ID) Timer timer,
>             @TimerId(STALE_TIMER_ID) Timer staleTimer,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             @StateId(KEY_ID) ValueState<K> key,
>             @Element KV<K, InputT> element,
>             BoundedWindow window,
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
>             Instant windowExpires = window.maxTimestamp().plus(allowedLateness);
>             log.debug(
>                 "*** SET TIMER *** to point in time {} for window {}",
>                 windowExpires.toString(),
>                 window.toString());
>             timer.set(windowExpires);
>             key.write(element.getKey());
>             batch.add(element.getValue());
>             log.debug("*** BATCH *** Add element for window {} ", window.toString());
>             // blind add is supported with combiningState
>             numElementsInBatch.add(1L);
>             Long num = numElementsInBatch.read();
>             // set a stale time on Processing Timer to emit batch every n millis
>             if (num == 1) {
>                 staleTimer.offset(Duration.millis(staleTime)).setRelative();
>             }
>             if (num % prefetchFrequency == 0) {
>                 // prefetch data and modify batch state (readLater() modifies this)
>                 batch.readLater();
>             }
>             if (num >= batchSize) {
>                 log.debug("*** END OF BATCH *** for window {}", window.toString());
>                 flushBatch(receiver, key, batch, numElementsInBatch);
>             }
>         }
>         @OnTimer(STALE_TIMER_ID)
>         public void onStaleCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** Stale Timer *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         @OnTimer(END_OF_WINDOW_ID)
>         public void onTimerCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** END OF WINDOW *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         private void flushBatch(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             ValueState<K> key,
>             BagState<InputT> batch,
>             CombiningState<Long, long[], Long> numElementsInBatch) {
>             Iterable<InputT> values = batch.read();
>             // when the timer fires, batch state might be empty
>             if (!Iterables.isEmpty(values)) {
>                 receiver.output(KV.of(key.read(), values));
>             }
>             batch.clear();
>             log.debug("*** BATCH *** clear");
>             numElementsInBatch.clear();
>         }
>     }
> }
> {code}
> Compared the batches outputed by adding two stages:
> {code:java}
> .apply("Batch transactions New", GroupIntoBatches.of(200,250);
> .apply("Batch transactions Old", org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
>  
> In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and creating a batch every 250 millis, but In Dataflow Runner this is not being triggered.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)