You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 15:44:45 UTC

[GitHub] [beam] damccorm opened a new issue, #20203: Timers exception on "Job Drain" while using stateful beam processing in global window

damccorm opened a new issue, #20203:
URL: https://github.com/apache/beam/issues/20203

   Hello,
    
   I have a use case where I have two sets of PCollections (RecordA and RecordB) coming from a real time streaming source like Kafka.
    
   Both Records are correlated with a common key, let's say KEY.
    
   The purpose is to enrich RecordA with RecordB's data for which I am using CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 minutes of event time, I am maintaining a sliding window for both records and then do CoGpByKey for both PCollections.
    
   The sliding windows that will find both RecordA and RecordB for a common key KEY, will emit enriched output. Now, since multiple sliding windows can emit the same output, I finally remove duplicate results by feeding aforementioned outputs to a global window where I maintain a state to check whether output has already been processed or not. Since it is a global window, I maintain a Timer on state (for GC) to let it expire after 10 minutes have elapsed since state has been written.
    
   This is working perfectly fine w.r.t the expected results. However, I am unable to stop job gracefully i.e. Drain the job gracefully. I see following exception:
    
   java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
   org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
   org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
   org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   java.lang.Thread.run(Thread.java:745)
   java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
   org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
   org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
   org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   java.lang.Thread.run(Thread.java:745)
   java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received state cleanup timer for window *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z*
   org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
   org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
   org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
   org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   java.lang.Thread.run(Thread.java:745)
    
    
   My code snippet:
    
   PCollection<KV<MyKey, RecordA\>\> windowedRecordA =
    incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", Window.<KV<MyKey, RecordA\>\>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); PCollection<KV<MyKey, RecordB\>\> windowedRecordB =
    recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, RecordB\>\>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());PCollection<KV<MyKey, CoGbkResult\>\> coGbkRecords =
    KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
    .and(TagRecordB, windowedRecordB)
    .apply("CoGroupByKey", CoGroupByKey.create()); PCollection<RecordA\> enrichedRecordA =
    coGbkRecords.apply("EnrichRecordAWithRecordB",
    new EnrichIncompleteRecordA()); class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, CoGbkResult\>\>, PCollection<RecordA\>\> {
    @Override
    public PCollection<RecordA\> expand(PCollection<KV<MyKey, CoGbkResult\>\> input) {
    logger.info("Enriching Incomplete RecordA with RecordB");
    return input
    .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new AddRecordBData()))
    .apply("Applying_Windowing", Window.<KV<MyKey, RecordA\>\>into(new GlobalWindows())
    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
    .discardingFiredPanes())
    .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA()));
   
    }
   
    private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult\>, KV<MyKey, RecordA\>\> {
    @Setup
    public void setup() {
    }
   
    @StartBundle
    public void startBundle() {
   
    }
   
    @ProcessElement
    public void processElement(@Element KV<MyKey, CoGbkResult\> input, OutputReceiver<KV<MyKey, RecordA\>\> out) {
    Iterable<RecordA\> allRecordALogs = input.getValue().getAll(TagRecordA);
    Iterable<RecordB\> allRecordBLogs = input.getValue().getAll(TagRecordB);
   
    /*
    There should be max 1 RecordB per MyKey
    */
    if (allRecordALogs.iterator().hasNext() && allRecordBLogs.iterator().hasNext()) {
    RecordB recordB = Iterables.getFirst(allRecordBLogs, null);
    for (RecordA recordA : allRecordALogs) {
    if (null != recordB) {
    logger.info("Enriching incomplete recordA [{}] with recordB: [{}]", recordA, recordB);
    <code to populate recordA object with recordB data\> out.output(KV.of(input.getKey(), recordA));
    } else {
    logger.error("No recordB available for recordA log [{}]", recordA);
    }
    }
    } else {
    logger.info("Either recordA or recordB not present for myKey: {}", input.getKey());
    }
    }
   
    @FinishBundle
    public void finishBundle() {
   
    }
   
    @Teardown
    public void teardown() {
    }
    }
   
   
    private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA\>, RecordA\> {
    @Setup
    public void setup() {
    }
   
    @StartBundle
    public void startBundle() {
    }
   
    @StateId("processedRecordA")
    private final StateSpec<ValueState<RecordA\> processedRecordASpec = StateSpecs.value(AvroCoder.of(RecordA.class));
   
    @TimerId("stateExpiry")
    private final TimerSpec stateExpirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
   
    @ProcessElement
    public void processElement(@Element KV<MyKey, RecordA\> input, OutputReceiver<RecordA\> out,
    @StateId("processedRecordA") ValueState<Set<RecordA\>\> processedRecordAState,
    @TimerId("stateExpiry") Timer stateExpiryTimer) {
    << code to check if recordA has already been processed by checking state \>\>
    if (recordA need to be emitted) {
    processedRecordAState.write(processedRecordASet);
    stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
    logger.info("Emitting unique recordA {} for myKey {}", recordA, myKey);
    out.output(input.getValue());
    }
    }
   
    @OnTimer("stateExpiry")
    public void onExpiry(
    OnTimerContext context,
    @StateId("processedRecordA") ValueState<RecordA\> processedRecordAState) {
    logger.info("Expiring State after timer expiry");
    processedRecordAState.clear();
    }
   
    @FinishBundle
    public void finishBundle() {
    }
   
    @Teardown
    public void teardown() {
    }
    }
   }
   
   Imported from Jira [BEAM-10053](https://issues.apache.org/jira/browse/BEAM-10053). Original Jira may contain additional context.
   Reported by: mohilkhare.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Timers exception on "Job Drain" while using stateful beam processing in global window [beam]

Posted by "genyherrera (via GitHub)" <gi...@apache.org>.
genyherrera commented on issue #20203:
URL: https://github.com/apache/beam/issues/20203#issuecomment-1989270080

   Is there any update regarding this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on issue #20203: Timers exception on "Job Drain" while using stateful beam processing in global window

Posted by GitBox <gi...@apache.org>.
bvolpato commented on issue #20203:
URL: https://github.com/apache/beam/issues/20203#issuecomment-1159612913

   Can reproduce this issue after migrating a `GroupByKey.create()` to `GroupIntoBatches.ofSize(size).withShardedKey()`:
   
   ```
   java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@8ffcc6c received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@71ffb928  that is before the appropriate cleanup time 294247-01-10T04:00:54.776Z
   	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
   	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:397)
   	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:79)
   	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:460)
   	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:483)
   	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:359)
   	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
   	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:94)
   	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
   	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
   	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
   	at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:834)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Timers exception on "Job Drain" while using stateful beam processing in global window [beam]

Posted by "anip-patel-exa (via GitHub)" <gi...@apache.org>.
anip-patel-exa commented on issue #20203:
URL: https://github.com/apache/beam/issues/20203#issuecomment-1908717408

   @bvolpato Can this be fixed by not using GroupIntoBatches? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] anip-patel-exa commented on issue #20203: Timers exception on "Job Drain" while using stateful beam processing in global window

Posted by "anip-patel-exa (via GitHub)" <gi...@apache.org>.
anip-patel-exa commented on issue #20203:
URL: https://github.com/apache/beam/issues/20203#issuecomment-1517156319

   Is there any update regarding this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org