You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Tyson Hamilton (Jira)" <ji...@apache.org> on 2020/09/11 20:21:00 UTC
[jira] [Updated] (BEAM-10342) Processing Time Timer is not being
invoked
[ https://issues.apache.org/jira/browse/BEAM-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tyson Hamilton updated BEAM-10342:
----------------------------------
Status: Resolved (was: Open)
> Processing Time Timer is not being invoked
> ------------------------------------------
>
> Key: BEAM-10342
> URL: https://issues.apache.org/jira/browse/BEAM-10342
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: sailaxmankumar
> Priority: P2
>
> When using state time domain to batch items for an external api
> We are trying to use the existing {{GroupIntoBatches}}. but we need some additional functionality to create batch every n milliseconds. So we are trying to add some functionality to this Transform by adding an additional {{Timer}} of {{TimeDomain.PROCESSING_TIME}}.
> {code:java}
> @TimerId(STALE_TIMER_ID)
> private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> {code}
> When we are testing this Timer in Dataflow Runner, this is not outputting any new batches
> Updated GroupIntoBatches code:
> {code:java}
> package my.package;
> import static com.google.common.base.Preconditions.checkArgument;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.BagState;
> import org.apache.beam.sdk.state.CombiningState;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.transforms.Combine;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.Iterables;
> import lombok.RequiredArgsConstructor;
> import lombok.extern.slf4j.Slf4j;
> @Slf4j
> @RequiredArgsConstructor
> public class GroupIntoBatches<K, InputT>
> extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> {
> private final long batchSize;
> private final long staleTime;
> public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, long staleTime) {
> return new GroupIntoBatches<>(batchSize, staleTime);
> }
> @Override
> public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> input) {
> Duration allowedLateness = input.getWindowingStrategy().getAllowedLateness();
> checkArgument(
> input.getCoder() instanceof KvCoder,
> "coder specified in the input PCollection is not a KvCoder");
> KvCoder inputCoder = (KvCoder) input.getCoder();
> Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
> Coder<InputT> valueCoder = (Coder<InputT>) inputCoder.getCoderArguments().get(1);
> return input.apply(
> ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, allowedLateness, keyCoder, valueCoder)));
> }
> @VisibleForTesting
> static class GroupIntoBatchesDoFn<K, InputT>
> extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
> private static final String STALE_TIMER_ID = "staleTimer";
> private static final String END_OF_WINDOW_ID = "endOFWindow";
> private static final String BATCH_ID = "batch";
> private static final String NUM_ELEMENTS_IN_BATCH_ID = "numElementsInBatch";
> private static final String KEY_ID = "key";
> private final long batchSize;
> private final long staleTime;
> private final Duration allowedLateness;
> @TimerId(END_OF_WINDOW_ID)
> private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @TimerId(STALE_TIMER_ID)
> private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> @StateId(BATCH_ID)
> private final StateSpec<BagState<InputT>> batchSpec;
> @StateId(NUM_ELEMENTS_IN_BATCH_ID)
> private final StateSpec<CombiningState<Long, long[], Long>> numElementsInBatchSpec;
> @StateId(KEY_ID)
> private final StateSpec<ValueState<K>> keySpec;
> private final long prefetchFrequency;
> GroupIntoBatchesDoFn(
> long batchSize,
> long staleTime,
> Duration allowedLateness,
> Coder<K> inputKeyCoder,
> Coder<InputT> inputValueCoder) {
> this.batchSize = batchSize;
> this.staleTime = staleTime;
> this.allowedLateness = allowedLateness;
> this.batchSpec = StateSpecs.bag(inputValueCoder);
> this.numElementsInBatchSpec =
> StateSpecs.combining(
> new Combine.BinaryCombineLongFn() {
> @Override
> public long identity() {
> return 0L;
> }
> @Override
> public long apply(long left, long right) {
> return left + right;
> }
> });
> this.keySpec = StateSpecs.value(inputKeyCoder);
> // prefetch every 20% of batchSize elements. Do not prefetch if batchSize is too little
> this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5);
> }
> @ProcessElement
> public void processElement(
> @TimerId(END_OF_WINDOW_ID) Timer timer,
> @TimerId(STALE_TIMER_ID) Timer staleTimer,
> @StateId(BATCH_ID) BagState<InputT> batch,
> @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
> @StateId(KEY_ID) ValueState<K> key,
> @Element KV<K, InputT> element,
> BoundedWindow window,
> OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
> Instant windowExpires = window.maxTimestamp().plus(allowedLateness);
> log.debug(
> "*** SET TIMER *** to point in time {} for window {}",
> windowExpires.toString(),
> window.toString());
> timer.set(windowExpires);
> key.write(element.getKey());
> batch.add(element.getValue());
> log.debug("*** BATCH *** Add element for window {} ", window.toString());
> // blind add is supported with combiningState
> numElementsInBatch.add(1L);
> Long num = numElementsInBatch.read();
> // set a stale time on Processing Timer to emit batch every n millis
> if (num == 1) {
> staleTimer.offset(Duration.millis(staleTime)).setRelative();
> }
> if (num % prefetchFrequency == 0) {
> // prefetch data and modify batch state (readLater() modifies this)
> batch.readLater();
> }
> if (num >= batchSize) {
> log.debug("*** END OF BATCH *** for window {}", window.toString());
> flushBatch(receiver, key, batch, numElementsInBatch);
> }
> }
> @OnTimer(STALE_TIMER_ID)
> public void onStaleCallback(
> OutputReceiver<KV<K, Iterable<InputT>>> receiver,
> @Timestamp Instant timestamp,
> @StateId(KEY_ID) ValueState<K> key,
> @StateId(BATCH_ID) BagState<InputT> batch,
> @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
> BoundedWindow window) {
> log.debug(
> "*** Stale Timer *** for timer timestamp {} in windows {}",
> timestamp,
> window.toString());
> flushBatch(receiver, key, batch, numElementsInBatch);
> }
> @OnTimer(END_OF_WINDOW_ID)
> public void onTimerCallback(
> OutputReceiver<KV<K, Iterable<InputT>>> receiver,
> @Timestamp Instant timestamp,
> @StateId(KEY_ID) ValueState<K> key,
> @StateId(BATCH_ID) BagState<InputT> batch,
> @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], Long> numElementsInBatch,
> BoundedWindow window) {
> log.debug(
> "*** END OF WINDOW *** for timer timestamp {} in windows {}",
> timestamp,
> window.toString());
> flushBatch(receiver, key, batch, numElementsInBatch);
> }
> private void flushBatch(
> OutputReceiver<KV<K, Iterable<InputT>>> receiver,
> ValueState<K> key,
> BagState<InputT> batch,
> CombiningState<Long, long[], Long> numElementsInBatch) {
> Iterable<InputT> values = batch.read();
> // when the timer fires, batch state might be empty
> if (!Iterables.isEmpty(values)) {
> receiver.output(KV.of(key.read(), values));
> }
> batch.clear();
> log.debug("*** BATCH *** clear");
> numElementsInBatch.clear();
> }
> }
> }
> {code}
> Compared the batches outputed by adding two stages:
> {code:java}
> .apply("Batch transactions New", GroupIntoBatches.of(200,250);
> .apply("Batch transactions Old", org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
>
> In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and creating a batch every 250 millis, but In Dataflow Runner this is not being triggered.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)