You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ismaël Mejía (Jira)" <ji...@apache.org> on 2020/05/21 07:39:00 UTC

[jira] [Assigned] (BEAM-10053) Timers exception on "Job Drain" while using stateful beam processing in global window

     [ https://issues.apache.org/jira/browse/BEAM-10053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ismaël Mejía reassigned BEAM-10053:
-----------------------------------

    Assignee:     (was: Aizhamal Nurmamat kyzy)

> Timers exception on "Job Drain" while using stateful beam processing in global window
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-10053
>                 URL: https://issues.apache.org/jira/browse/BEAM-10053
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-community, runner-dataflow, sdk-java-core
>    Affects Versions: 2.19.0
>            Reporter: MOHIL
>            Priority: P2
>
> Hello,
>  
> I have a use case where I have two sets of PCollections (RecordA and RecordB) coming from a real time streaming source like Kafka.
>  
> Both Records are correlated with a common key, let's say KEY.
>  
> The purpose is to enrich RecordA with RecordB's data for which I am using CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 minutes of event time, I am maintaining a sliding window for both records and then do CoGpByKey for both PCollections.
>  
> The sliding windows that will find both RecordA and RecordB for a common key KEY, will emit enriched output. Now, since multiple sliding windows can emit the same output, I finally remove duplicate results by feeding aforementioned outputs to a global window where I maintain a state to check whether output has already been processed or not. Since it is a global window, I maintain a Timer on state (for GC) to let it expire after 10 minutes have elapsed since state has been written.
>  
> This is working perfectly fine w.r.t the expected results. However, I am unable to stop job gracefully i.e. Drain the job gracefully. I see following exception:
>  
> java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received state cleanup timer for window *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z*
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>  
>  
> My code snippet:
>  
> PCollection<KV<MyKey, RecordA>> windowedRecordA =
>  incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", Window.<KV<MyKey, RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>  .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); PCollection<KV<MyKey, RecordB>> windowedRecordB =
>  recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>  .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());PCollection<KV<MyKey, CoGbkResult>> coGbkRecords =
>  KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
>  .and(TagRecordB, windowedRecordB)
>  .apply("CoGroupByKey", CoGroupByKey.create()); PCollection<RecordA> enrichedRecordA =
>  coGbkRecords.apply("EnrichRecordAWithRecordB",
>  new EnrichIncompleteRecordA()); class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, CoGbkResult>>, PCollection<RecordA>> {
>  @Override
>  public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> input) {
>  logger.info("Enriching Incomplete RecordA with RecordB");
>  return input
>  .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new AddRecordBData()))
>  .apply("Applying_Windowing", Window.<KV<MyKey, RecordA>>into(new GlobalWindows())
>  .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>  .discardingFiredPanes())
>  .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA()));
>  }
>  private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, KV<MyKey, RecordA>> {
>  @Setup
>  public void setup() {
>  }
>  @StartBundle
>  public void startBundle() {
>  }
>  @ProcessElement
>  public void processElement(@Element KV<MyKey, CoGbkResult> input, OutputReceiver<KV<MyKey, RecordA>> out) {
>  Iterable<RecordA> allRecordALogs = input.getValue().getAll(TagRecordA);
>  Iterable<RecordB> allRecordBLogs = input.getValue().getAll(TagRecordB);
>  /*
>  There should be max 1 RecordB per MyKey
>  */
>  if (allRecordALogs.iterator().hasNext() && allRecordBLogs.iterator().hasNext()) {
>  RecordB recordB = Iterables.getFirst(allRecordBLogs, null);
>  for (RecordA recordA : allRecordALogs) {
>  if (null != recordB) {
>  logger.info("Enriching incomplete recordA [{}] with recordB: [{}]", recordA, recordB);
>  <code to populate recordA object with recordB data> out.output(KV.of(input.getKey(), recordA));
>  } else {
>  logger.error("No recordB available for recordA log [{}]", recordA);
>  }
>  }
>  } else {
>  logger.info("Either recordA or recordB not present for myKey: {}", input.getKey());
>  }
>  }
>  @FinishBundle
>  public void finishBundle() {
>  }
>  @Teardown
>  public void teardown() {
>  }
>  }
>  private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, RecordA> {
>  @Setup
>  public void setup() {
>  }
>  @StartBundle
>  public void startBundle() {
>  }
>  @StateId("processedRecordA")
>  private final StateSpec<ValueState<RecordA> processedRecordASpec = StateSpecs.value(AvroCoder.of(RecordA.class));
>  @TimerId("stateExpiry")
>  private final TimerSpec stateExpirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>  @ProcessElement
>  public void processElement(@Element KV<MyKey, RecordA> input, OutputReceiver<RecordA> out,
>  @StateId("processedRecordA") ValueState<Set<RecordA>> processedRecordAState,
>  @TimerId("stateExpiry") Timer stateExpiryTimer) {
>  << code to check if recordA has already been processed by checking state >>
>  if (recordA need to be emitted) {
>  processedRecordAState.write(processedRecordASet);
>  stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
>  logger.info("Emitting unique recordA {} for myKey {}", recordA, myKey);
>  out.output(input.getValue());
>  }
>  }
>  @OnTimer("stateExpiry")
>  public void onExpiry(
>  OnTimerContext context,
>  @StateId("processedRecordA") ValueState<RecordA> processedRecordAState) {
>  logger.info("Expiring State after timer expiry");
>  processedRecordAState.clear();
>  }
>  @FinishBundle
>  public void finishBundle() {
>  }
>  @Teardown
>  public void teardown() {
>  }
>  }
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)