You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/05/15 17:57:02 UTC

[jira] [Updated] (BEAM-10342) Processing Time Timer is not being invoked

     [ https://issues.apache.org/jira/browse/BEAM-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-10342:
-----------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Resolved)

Hello! Due to a bug in our Jira configuration, this issue had status:Resolved but resolution:Unresolved.

I am bulk editing these issues to have resolution:Fixed

If a different resolution is appropriate, please change it. To do this, click the "Resolve" button (you can do this even for closed issues) and set the Resolution field to the right value.

> Processing Time Timer is not being invoked
> ------------------------------------------
>
>                 Key: BEAM-10342
>                 URL: https://issues.apache.org/jira/browse/BEAM-10342
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: sailaxmankumar
>            Priority: P2
>
> When using state time domain to batch items for an external api
> We are trying to use the existing {{GroupIntoBatches}}. but we need some additional functionality to create batch every n milliseconds. So we are trying to add some functionality to this Transform by adding an additional {{Timer}} of {{TimeDomain.PROCESSING_TIME}}.
> {code:java}
> @TimerId(STALE_TIMER_ID)
>  private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> {code}
> When we are testing this Timer in Dataflow Runner, this is not outputting any new batches
> Updated GroupIntoBatches code:
> {code:java}
> package my.package;
> import static com.google.common.base.Preconditions.checkArgument;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.BagState;
> import org.apache.beam.sdk.state.CombiningState;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.transforms.Combine;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.Iterables;
> import lombok.RequiredArgsConstructor;
> import lombok.extern.slf4j.Slf4j;
> @Slf4j
> @RequiredArgsConstructor
> public class GroupIntoBatches<K, InputT>
>     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> {
>     private final long batchSize;
>     private final long staleTime;
>     public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, long staleTime) {
>         return new GroupIntoBatches<>(batchSize, staleTime);
>     }
>     @Override
>     public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> input) {
>         Duration allowedLateness = input.getWindowingStrategy().getAllowedLateness();
>         checkArgument(
>             input.getCoder() instanceof KvCoder,
>             "coder specified in the input PCollection is not a KvCoder");
>         KvCoder inputCoder = (KvCoder) input.getCoder();
>         Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
>         Coder<InputT> valueCoder = (Coder<InputT>) inputCoder.getCoderArguments().get(1);
>         return input.apply(
>             ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, allowedLateness, keyCoder, valueCoder)));
>     }
>     @VisibleForTesting
>     static class GroupIntoBatchesDoFn<K, InputT>
>         extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
>         private static final String STALE_TIMER_ID = "staleTimer";
>         private static final String END_OF_WINDOW_ID = "endOFWindow";
>         private static final String BATCH_ID = "batch";
>         private static final String NUM_ELEMENTS_IN_BATCH_ID = "numElementsInBatch";
>         private static final String KEY_ID = "key";
>         private final long batchSize;
>         private final long staleTime;
>         private final Duration allowedLateness;
>         @TimerId(END_OF_WINDOW_ID)
>         private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>         @TimerId(STALE_TIMER_ID)
>         private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>         @StateId(BATCH_ID)
>         private final StateSpec<BagState<InputT>> batchSpec;
>         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
>         private final StateSpec<CombiningState<Long, long[], Long>> numElementsInBatchSpec;
>         @StateId(KEY_ID)
>         private final StateSpec<ValueState<K>> keySpec;
>         private final long prefetchFrequency;
>         GroupIntoBatchesDoFn(
>             long batchSize,
>             long staleTime,
>             Duration allowedLateness,
>             Coder<K> inputKeyCoder,
>             Coder<InputT> inputValueCoder) {
>             this.batchSize = batchSize;
>             this.staleTime = staleTime;
>             this.allowedLateness = allowedLateness;
>             this.batchSpec = StateSpecs.bag(inputValueCoder);
>             this.numElementsInBatchSpec =
>                 StateSpecs.combining(
>                     new Combine.BinaryCombineLongFn() {
>                         @Override
>                         public long identity() {
>                             return 0L;
>                         }
>                         @Override
>                         public long apply(long left, long right) {
>                             return left + right;
>                         }
>                     });
>             this.keySpec = StateSpecs.value(inputKeyCoder);
>             // prefetch every 20% of batchSize elements. Do not prefetch if batchSize is too little
>             this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5);
>         }
>         @ProcessElement
>         public void processElement(
>             @TimerId(END_OF_WINDOW_ID) Timer timer,
>             @TimerId(STALE_TIMER_ID) Timer staleTimer,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             @StateId(KEY_ID) ValueState<K> key,
>             @Element KV<K, InputT> element,
>             BoundedWindow window,
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
>             Instant windowExpires = window.maxTimestamp().plus(allowedLateness);
>             log.debug(
>                 "*** SET TIMER *** to point in time {} for window {}",
>                 windowExpires.toString(),
>                 window.toString());
>             timer.set(windowExpires);
>             key.write(element.getKey());
>             batch.add(element.getValue());
>             log.debug("*** BATCH *** Add element for window {} ", window.toString());
>             // blind add is supported with combiningState
>             numElementsInBatch.add(1L);
>             Long num = numElementsInBatch.read();
>             // set a stale time on Processing Timer to emit batch every n millis
>             if (num == 1) {
>                 staleTimer.offset(Duration.millis(staleTime)).setRelative();
>             }
>             if (num % prefetchFrequency == 0) {
>                 // prefetch data and modify batch state (readLater() modifies this)
>                 batch.readLater();
>             }
>             if (num >= batchSize) {
>                 log.debug("*** END OF BATCH *** for window {}", window.toString());
>                 flushBatch(receiver, key, batch, numElementsInBatch);
>             }
>         }
>         @OnTimer(STALE_TIMER_ID)
>         public void onStaleCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** Stale Timer *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         @OnTimer(END_OF_WINDOW_ID)
>         public void onTimerCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** END OF WINDOW *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         private void flushBatch(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             ValueState<K> key,
>             BagState<InputT> batch,
>             CombiningState<Long, long[], Long> numElementsInBatch) {
>             Iterable<InputT> values = batch.read();
>             // when the timer fires, batch state might be empty
>             if (!Iterables.isEmpty(values)) {
>                 receiver.output(KV.of(key.read(), values));
>             }
>             batch.clear();
>             log.debug("*** BATCH *** clear");
>             numElementsInBatch.clear();
>         }
>     }
> }
> {code}
> Compared the batches outputed by adding two stages:
> {code:java}
> .apply("Batch transactions New", GroupIntoBatches.of(200,250);
> .apply("Batch transactions Old", org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
>  
> In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and creating a batch every 250 millis, but In Dataflow Runner this is not being triggered.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)