You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "sailaxmankumar (Jira)" <ji...@apache.org> on 2020/08/19 11:50:00 UTC

[jira] [Comment Edited] (BEAM-10342) Processing Time Timer is not being invoked

    [ https://issues.apache.org/jira/browse/BEAM-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180486#comment-17180486 ] 

sailaxmankumar edited comment on BEAM-10342 at 8/19/20, 11:49 AM:
------------------------------------------------------------------

You must be right, I tried simulating with a more data in stream and it seem to trigger, but I am unable to figure it I can use it for any relativity to create batches in a window I should probably add a trigger in window instead, But, I have one question which got me confused, If I add log on trigger like
{code:java}
@OnTimer(STALE_TIMER_ID)
public void onStaleCallback(
    OutputReceiver<KV<String,String>> receiver,
    @Timestamp Instant timestamp,
    BoundedWindow window) {
    LOG.info(
        "*** Stale Timer *** for timer timestamp {} in windows {}",
        timestamp,
        window.toString());
}{code}
I couldn't figure out what is the @Timestamp Instant timestamp as this seems be earlier than window time, where as the timer is set to 

staleTimer.offset(Duration.millis(100)).setRelative(); Is this because the timer is still triggered at the finish of bundle irrespective of the processing time? 
{code:java}
2020-08-19 21:24:25.949 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.377 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.378 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.378 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.383 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.933 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:26.000Z..2020-08-19T11:24:27.000Z)
2020-08-19 21:24:26.936 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:26.000Z..2020-08-19T11:24:27.000Z)
2020-08-19 21:24:26.941 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:26.000Z..2020-08-19T11:24:27.000Z)
{code}


was (Author: sai.maduri):
You must be right, I tried simulating with a lot more data in stream, If I add log on trigger like
{code:java}
@OnTimer(STALE_TIMER_ID)
public void onStaleCallback(
    OutputReceiver<KV<String,String>> receiver,
    @Timestamp Instant timestamp,
    BoundedWindow window) {
    LOG.info(
        "*** Stale Timer *** for timer timestamp {} in windows {}",
        timestamp,
        window.toString());
}{code}
I couldn't figure out what is the @Timestamp Instant timestamp as this seems be earlier than window time, where as the timer is set to 

staleTimer.offset(Duration.millis(100)).setRelative(); Is this because the timer is still triggered at the finish of bundle irrespective of the processing time? 
{code:java}
2020-08-19 21:24:25.949 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.377 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.378 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.378 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.383 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:25.000Z..2020-08-19T11:24:26.000Z)
2020-08-19 21:24:26.933 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:26.000Z..2020-08-19T11:24:27.000Z)
2020-08-19 21:24:26.936 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:26.000Z..2020-08-19T11:24:27.000Z)
2020-08-19 21:24:26.941 AEST*** Stale Timer *** for timer timestamp 2020-08-19T11:24:06.000Z in windows [2020-08-19T11:24:26.000Z..2020-08-19T11:24:27.000Z)
{code}

> Processing Time Timer is not being invoked
> ------------------------------------------
>
>                 Key: BEAM-10342
>                 URL: https://issues.apache.org/jira/browse/BEAM-10342
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: sailaxmankumar
>            Priority: P2
>
> When using state time domain to batch items for an external api
> We are trying to use the existing {{GroupIntoBatches}}. but we need some additional functionality to create batch every n milliseconds. So we are trying to add some functionality to this Transform by adding an additional {{Timer}} of {{TimeDomain.PROCESSING_TIME}}.
> {code:java}
> @TimerId(STALE_TIMER_ID)
>  private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> {code}
> When we are testing this Timer in Dataflow Runner, this is not outputting any new batches
> Updated GroupIntoBatches code:
> {code:java}
> package my.package;
> import static com.google.common.base.Preconditions.checkArgument;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.BagState;
> import org.apache.beam.sdk.state.CombiningState;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.transforms.Combine;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.Iterables;
> import lombok.RequiredArgsConstructor;
> import lombok.extern.slf4j.Slf4j;
> @Slf4j
> @RequiredArgsConstructor
> public class GroupIntoBatches<K, InputT>
>     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> {
>     private final long batchSize;
>     private final long staleTime;
>     public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, long staleTime) {
>         return new GroupIntoBatches<>(batchSize, staleTime);
>     }
>     @Override
>     public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> input) {
>         Duration allowedLateness = input.getWindowingStrategy().getAllowedLateness();
>         checkArgument(
>             input.getCoder() instanceof KvCoder,
>             "coder specified in the input PCollection is not a KvCoder");
>         KvCoder inputCoder = (KvCoder) input.getCoder();
>         Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
>         Coder<InputT> valueCoder = (Coder<InputT>) inputCoder.getCoderArguments().get(1);
>         return input.apply(
>             ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, allowedLateness, keyCoder, valueCoder)));
>     }
>     @VisibleForTesting
>     static class GroupIntoBatchesDoFn<K, InputT>
>         extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
>         private static final String STALE_TIMER_ID = "staleTimer";
>         private static final String END_OF_WINDOW_ID = "endOFWindow";
>         private static final String BATCH_ID = "batch";
>         private static final String NUM_ELEMENTS_IN_BATCH_ID = "numElementsInBatch";
>         private static final String KEY_ID = "key";
>         private final long batchSize;
>         private final long staleTime;
>         private final Duration allowedLateness;
>         @TimerId(END_OF_WINDOW_ID)
>         private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>         @TimerId(STALE_TIMER_ID)
>         private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>         @StateId(BATCH_ID)
>         private final StateSpec<BagState<InputT>> batchSpec;
>         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
>         private final StateSpec<CombiningState<Long, long[], Long>> numElementsInBatchSpec;
>         @StateId(KEY_ID)
>         private final StateSpec<ValueState<K>> keySpec;
>         private final long prefetchFrequency;
>         GroupIntoBatchesDoFn(
>             long batchSize,
>             long staleTime,
>             Duration allowedLateness,
>             Coder<K> inputKeyCoder,
>             Coder<InputT> inputValueCoder) {
>             this.batchSize = batchSize;
>             this.staleTime = staleTime;
>             this.allowedLateness = allowedLateness;
>             this.batchSpec = StateSpecs.bag(inputValueCoder);
>             this.numElementsInBatchSpec =
>                 StateSpecs.combining(
>                     new Combine.BinaryCombineLongFn() {
>                         @Override
>                         public long identity() {
>                             return 0L;
>                         }
>                         @Override
>                         public long apply(long left, long right) {
>                             return left + right;
>                         }
>                     });
>             this.keySpec = StateSpecs.value(inputKeyCoder);
>             // prefetch every 20% of batchSize elements. Do not prefetch if batchSize is too little
>             this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5);
>         }
>         @ProcessElement
>         public void processElement(
>             @TimerId(END_OF_WINDOW_ID) Timer timer,
>             @TimerId(STALE_TIMER_ID) Timer staleTimer,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             @StateId(KEY_ID) ValueState<K> key,
>             @Element KV<K, InputT> element,
>             BoundedWindow window,
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
>             Instant windowExpires = window.maxTimestamp().plus(allowedLateness);
>             log.debug(
>                 "*** SET TIMER *** to point in time {} for window {}",
>                 windowExpires.toString(),
>                 window.toString());
>             timer.set(windowExpires);
>             key.write(element.getKey());
>             batch.add(element.getValue());
>             log.debug("*** BATCH *** Add element for window {} ", window.toString());
>             // blind add is supported with combiningState
>             numElementsInBatch.add(1L);
>             Long num = numElementsInBatch.read();
>             // set a stale time on Processing Timer to emit batch every n millis
>             if (num == 1) {
>                 staleTimer.offset(Duration.millis(staleTime)).setRelative();
>             }
>             if (num % prefetchFrequency == 0) {
>                 // prefetch data and modify batch state (readLater() modifies this)
>                 batch.readLater();
>             }
>             if (num >= batchSize) {
>                 log.debug("*** END OF BATCH *** for window {}", window.toString());
>                 flushBatch(receiver, key, batch, numElementsInBatch);
>             }
>         }
>         @OnTimer(STALE_TIMER_ID)
>         public void onStaleCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** Stale Timer *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         @OnTimer(END_OF_WINDOW_ID)
>         public void onTimerCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** END OF WINDOW *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         private void flushBatch(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             ValueState<K> key,
>             BagState<InputT> batch,
>             CombiningState<Long, long[], Long> numElementsInBatch) {
>             Iterable<InputT> values = batch.read();
>             // when the timer fires, batch state might be empty
>             if (!Iterables.isEmpty(values)) {
>                 receiver.output(KV.of(key.read(), values));
>             }
>             batch.clear();
>             log.debug("*** BATCH *** clear");
>             numElementsInBatch.clear();
>         }
>     }
> }
> {code}
> Compared the batches outputed by adding two stages:
> {code:java}
> .apply("Batch transactions New", GroupIntoBatches.of(200,250);
> .apply("Batch transactions Old", org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
>  
> In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and creating a batch every 250 millis, but In Dataflow Runner this is not being triggered.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)