You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/14 04:09:41 UTC

[GitHub] [beam] boyuanzz opened a new pull request #13105: [WIP] Initial support for self-checkpoint

boyuanzz opened a new pull request #13105:
URL: https://github.com/apache/beam/pull/13105


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714144610


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-724259540


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520082882



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +692,10 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      if (hasSdfProcessFn) {
+        // Sleep for 5s to wait for any SDF timer to be fired.
+        Thread.sleep(5000);

Review comment:
       Thanks! Filed here: https://issues.apache.org/jira/browse/BEAM-11210




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520974852



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Commit https://github.com/apache/beam/pull/13105/commits/56a004dd8fd9bb7c52fb56e0f53cd0dc7f8b1dfa introduces a workaround: draining processing timers manually for SDF. It's a workaround since it doesn't work efficiently if upstream advances watermark to MAX_TIMESTAMP very quickly. PTAL, @mxm 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [WIP] Initial support for self-checkpoint

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-713223023


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512901015



##########
File path: runners/flink/job-server/flink_job_server.gradle
##########
@@ -166,23 +166,22 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
         excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
         if (streaming) {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
         } else {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+          excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
           excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
         }
-        //SplitableDoFnTests
-        excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-
       }
     },
     testFilter: {
       // TODO(BEAM-10016)
       excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedBounded'

Review comment:
       I haven't explore the failure yet but the test is not exercising self initiated checkpoint and it failed on the same reason(wrong results) without my changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512908978



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -484,6 +530,148 @@ void setTimer(Timer<?> timerElement, TimerInternals.TimerData timerData) {
     }
   }
 
+  /**
+   * A {@link TimerInternalsFactory} for Flink operator to create a {@link
+   * StateAndTimerBundleCheckpointHandler} to handle {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+   */
+  class SdfFlinkTimerInternalsFactory implements TimerInternalsFactory<InputT> {
+    @Override
+    public TimerInternals timerInternalsForKey(InputT key) {
+      try {
+        ByteBuffer encodedKey =
+            (ByteBuffer) keySelector.getKey(WindowedValue.valueInGlobalWindow(key));
+        return new SdfFlinkTimerInternals(encodedKey);
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't get a timer internals", e);
+      }
+    }
+  }
+
+  /**
+   * A {@link TimerInternals} for rescheduling {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+   */
+  class SdfFlinkTimerInternals implements TimerInternals {
+    private final ByteBuffer key;
+
+    SdfFlinkTimerInternals(ByteBuffer key) {
+      this.key = key;
+    }
+
+    @Override
+    public void setTimer(
+        StateNamespace namespace,
+        String timerId,
+        String timerFamilyId,
+        Instant target,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
+      setTimer(
+          TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
+    }
+
+    @Override
+    public void setTimer(TimerData timerData) {
+      try {
+        try (Locker locker = Locker.locked(stateBackendLock)) {
+          getKeyedStateBackend().setCurrentKey(key);
+          timerInternals.setTimer(timerData);
+          minEventTimeTimerTimestampInCurrentBundle =
+              Math.min(
+                  minEventTimeTimerTimestampInCurrentBundle,
+                  adjustTimestampForFlink(timerData.getOutputTimestamp().getMillis()));
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't set timer", e);
+      }
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+      throw new UnsupportedOperationException(
+          "It is not expected to use SdfFlinkTimerInternals to delete a timer");
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
+      throw new UnsupportedOperationException(
+          "It is not expected to use SdfFlinkTimerInternals to delete a timer");
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      throw new UnsupportedOperationException(
+          "It is not expected to use SdfFlinkTimerInternals to delete a timer");
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timerInternals.currentProcessingTime();
+    }
+
+    @Override
+    public @Nullable Instant currentSynchronizedProcessingTime() {
+      return timerInternals.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public @Nullable Instant currentOutputWatermarkTime() {
+      return timerInternals.currentOutputWatermarkTime();
+    }
+  }
+
+  /**
+   * A {@link StateInternalsFactory} for Flink operator to create a {@link
+   * StateAndTimerBundleCheckpointHandler} to handle {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+   */
+  class SdfFlinkStateInternalsFactory implements StateInternalsFactory<InputT> {
+    @Override
+    public StateInternals stateInternalsForKey(InputT key) {
+      try {
+        ByteBuffer encodedKey =
+            (ByteBuffer) keySelector.getKey(WindowedValue.valueInGlobalWindow(key));
+        return new SdfFlinkStateInternals(encodedKey);
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't get a state internals", e);
+      }
+    }
+  }
+
+  /** A {@link StateInternals} for keeping {@link DelayedBundleApplication}s as states. */
+  class SdfFlinkStateInternals implements StateInternals {

Review comment:
       I want to keep `SdfFlinkInternals` and `SdfTimerInternals` as inner class so that they can access to `getKeyedStateBackend()`, `timerInternals` and `stateInternals` from outer class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r517607818



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       >  Is there a way to only sleep if there are such pending timers and then wait for precisely as long as the maximum of such timers?
   
   One way I'm coming up with is we only sleep when we are executing SDF/Process fn.
   
   > Also, I'm surprised these timers get fired at all, since newer versions of Flink stop processing timer execution once closing the operator (that's why we are draining the timers manually in super.close()).
   
   I haven't looked into Flink timer service yet but I guess it might be related to watermark hold.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-725953987


   My pleasure!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r517569105



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Makes sense. Thanks for clarifying. Is there a way to only sleep if there are such pending timers and then wait for precisely as long as the maximum of such timers? It just seems like a workaround to always wait 5 seconds. Also, I'm surprised these timers get fired at all, since newer versions of Flink stop processing timer execution once closing the operator (that's why we are draining the timers manually in `super.close()`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-715562965






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-718139268


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714863939


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-716698311


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r521252916



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       What you describe happens when you enable checkpointing or when you pass in `--shutdownSourcesAfterIdleMs`. The operator doesn't shutdown then and continues to process timers. The reason we do not enable this by default is that we want to cleanly shutdown pipelines during tests. Once an operator has been shut down, the entire job cannot be checkpointed anymore.
   
   The new workaround looks good to me. We do the same in the `super.close()`method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r516996679



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Would you like to explain more about the side effect here? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r517607818



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       >  Is there a way to only sleep if there are such pending timers and then wait for precisely as long as the maximum of such timers?
   One way I'm coming up with is we only sleep when we are executing SDF/Process fn.
   
   > Also, I'm surprised these timers get fired at all, since newer versions of Flink stop processing timer execution once closing the operator (that's why we are draining the timers manually in super.close()).
   I haven't looked into Flink timer service yet but I guess it might be related to watermark hold.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-724317535


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r513066466



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
       }
     }
 
+    /** Holds the watermark when there is an sdf timer. */
+    private void onNewSdfTimer(TimerData newTimer) {
+      Preconditions.checkState(
+          StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+      Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+      keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+    }

Review comment:
       Sorry I want to revisit the idea of having `onFiredTimer` here. I think it's a good idea to have `onFiredTimer` for firing timers. But the function `onNewSdfTimer` and `onNewEventTimer` is about to set watermark hold when registering timers. Different from event timer, an SDF timer must have the output timestamp for controlling watermark hold. It's important for SDF execution. That's why we have a check instead of an if block here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-717577100


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz merged pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #13105:
URL: https://github.com/apache/beam/pull/13105


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r514466557



##########
File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility methods for creating {@link BundleCheckpointHandler}s. */
+public class BundleCheckpointHandlers {
+
+  /**
+   * A {@link BundleCheckpointHandler} which uses {@link
+   * org.apache.beam.runners.core.TimerInternals.TimerData} ans {@link
+   * org.apache.beam.sdk.state.ValueState} to reschedule {@link DelayedBundleApplication}.
+   */
+  public static class StateAndTimerBundleCheckpointHandler<T> implements BundleCheckpointHandler {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(StateAndTimerBundleCheckpointHandler.class);
+    private final TimerInternalsFactory<T> timerInternalsFactory;
+    private final StateInternalsFactory<T> stateInternalsFactory;
+    private final Coder<WindowedValue<T>> residualCoder;
+    private final Coder windowCoder;
+    private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+    public static final String SDF_PREFIX = "sdf_checkpoint";
+
+    public StateAndTimerBundleCheckpointHandler(
+        TimerInternalsFactory<T> timerInternalsFactory,
+        StateInternalsFactory<T> stateInternalsFactory,
+        Coder<WindowedValue<T>> residualCoder,
+        Coder windowCoder) {
+      this.residualCoder = residualCoder;
+      this.windowCoder = windowCoder;
+      this.timerInternalsFactory = timerInternalsFactory;
+      this.stateInternalsFactory = stateInternalsFactory;
+    }
+
+    /**
+     * A helper function to help check whether the given timer is the timer which is set for
+     * rescheduling {@link DelayedBundleApplication}.
+     */
+    public static boolean isSdfTimer(String timerId) {
+      return timerId.startsWith(SDF_PREFIX);
+    }
+
+    private static String constructSdfCheckpointId(String id, int index) {
+      return SDF_PREFIX + ":" + id + ":" + index;
+    }
+
+    @Override
+    public void onCheckpoint(ProcessBundleResponse response) {
+      String id = idGenerator.getId();
+      for (int index = 0; index < response.getResidualRootsCount(); index++) {
+        DelayedBundleApplication residual = response.getResidualRoots(index);
+        if (!residual.hasApplication()) {
+          continue;
+        }
+        String tag = constructSdfCheckpointId(id, index);
+        try {
+          WindowedValue<T> stateValue =
+              CoderUtils.decodeFromByteArray(
+                  residualCoder, residual.getApplication().getElement().toByteArray());
+          TimerInternals timerInternals =
+              timerInternalsFactory.timerInternalsForKey((stateValue.getValue()));
+          StateInternals stateInternals =
+              stateInternalsFactory.stateInternalsForKey(stateValue.getValue());
+          // Calculate the timestamp for the timer.
+          Instant timestamp = Instant.now();
+          if (residual.hasRequestedTimeDelay()) {
+            timestamp = timestamp.plus(residual.getRequestedTimeDelay().getSeconds() * 1000);
+          }
+          // Calculate the watermark hold for the timer.
+          long outputTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+          if (!residual.getApplication().getOutputWatermarksMap().isEmpty()) {
+            for (org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark :
+                residual.getApplication().getOutputWatermarksMap().values()) {
+              outputTimestamp = Math.min(outputTimestamp, outputWatermark.getSeconds() * 1000);
+            }
+          } else {
+            outputTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();

Review comment:
       You could use `keyedStateInternals.minWatermarkHoldMs()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714171015


   > SDF + side input tests fail on validateParDo:
   > 
   > https://github.com/apache/beam/blob/ddcb600b79fdd7d2ea35a843502557d34bdfbb96/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java#L231-L237
   
   It seems like we only have main input for SplittableParDo transform: 
   ```
   splittableParDo.getInputsMap(): {org.apache.beam.sdk.values.PCollection.<init>:399#a00997a449310e6c=PAssert$0/GroupGlobally/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource).output}
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714075338


   SDF + side input tests fail on validateParDo: https://github.com/apache/beam/blob/ddcb600b79fdd7d2ea35a843502557d34bdfbb96/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java#L231-L237
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714829376


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-713844928


   Run Python_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r516420479



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Kindly pinged : )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714706808


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-724259433


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-713834830


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714715120


   I can verify that the pipeline proto is correct before fusion. The validation error happens when fusion returns the new pipeline proto. It seems like the something is wrong in pipeline fuser.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714863843


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714688771


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [WIP] Initial support for self-checkpoint

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-713222920


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r514483986



##########
File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility methods for creating {@link BundleCheckpointHandler}s. */
+public class BundleCheckpointHandlers {
+
+  /**
+   * A {@link BundleCheckpointHandler} which uses {@link
+   * org.apache.beam.runners.core.TimerInternals.TimerData} ans {@link
+   * org.apache.beam.sdk.state.ValueState} to reschedule {@link DelayedBundleApplication}.
+   */
+  public static class StateAndTimerBundleCheckpointHandler<T> implements BundleCheckpointHandler {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(StateAndTimerBundleCheckpointHandler.class);
+    private final TimerInternalsFactory<T> timerInternalsFactory;
+    private final StateInternalsFactory<T> stateInternalsFactory;
+    private final Coder<WindowedValue<T>> residualCoder;
+    private final Coder windowCoder;
+    private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+    public static final String SDF_PREFIX = "sdf_checkpoint";
+
+    public StateAndTimerBundleCheckpointHandler(
+        TimerInternalsFactory<T> timerInternalsFactory,
+        StateInternalsFactory<T> stateInternalsFactory,
+        Coder<WindowedValue<T>> residualCoder,
+        Coder windowCoder) {
+      this.residualCoder = residualCoder;
+      this.windowCoder = windowCoder;
+      this.timerInternalsFactory = timerInternalsFactory;
+      this.stateInternalsFactory = stateInternalsFactory;
+    }
+
+    /**
+     * A helper function to help check whether the given timer is the timer which is set for
+     * rescheduling {@link DelayedBundleApplication}.
+     */
+    public static boolean isSdfTimer(String timerId) {
+      return timerId.startsWith(SDF_PREFIX);
+    }
+
+    private static String constructSdfCheckpointId(String id, int index) {
+      return SDF_PREFIX + ":" + id + ":" + index;
+    }
+
+    @Override
+    public void onCheckpoint(ProcessBundleResponse response) {
+      String id = idGenerator.getId();
+      for (int index = 0; index < response.getResidualRootsCount(); index++) {
+        DelayedBundleApplication residual = response.getResidualRoots(index);
+        if (!residual.hasApplication()) {
+          continue;
+        }
+        String tag = constructSdfCheckpointId(id, index);
+        try {
+          WindowedValue<T> stateValue =
+              CoderUtils.decodeFromByteArray(
+                  residualCoder, residual.getApplication().getElement().toByteArray());
+          TimerInternals timerInternals =
+              timerInternalsFactory.timerInternalsForKey((stateValue.getValue()));
+          StateInternals stateInternals =
+              stateInternalsFactory.stateInternalsForKey(stateValue.getValue());
+          // Calculate the timestamp for the timer.
+          Instant timestamp = Instant.now();
+          if (residual.hasRequestedTimeDelay()) {
+            timestamp = timestamp.plus(residual.getRequestedTimeDelay().getSeconds() * 1000);
+          }
+          // Calculate the watermark hold for the timer.
+          long outputTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+          if (!residual.getApplication().getOutputWatermarksMap().isEmpty()) {
+            for (org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark :
+                residual.getApplication().getOutputWatermarksMap().values()) {
+              outputTimestamp = Math.min(outputTimestamp, outputWatermark.getSeconds() * 1000);
+            }
+          } else {
+            outputTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();

Review comment:
       Thanks, Max! I just double checked the contract we have: https://github.com/apache/beam/blob/7eb0e956dad87f84622d3523369579812ff82f0a/model/fn-execution/src/main/proto/beam_fn_api.proto#L236-L243
   Let's stick with MIN_TIMESTAMP for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512910196



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
       }
     }
 
+    /** Holds the watermark when there is an sdf timer. */
+    private void onNewSdfTimer(TimerData newTimer) {
+      Preconditions.checkState(
+          StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+      Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+      keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+    }

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512891996



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -683,8 +694,24 @@ private void translateStreamingImpulse(
                 inputPCollectionId,
                 valueCoder.getClass().getSimpleName()));
       }
-      keyCoder = ((KvCoder) valueCoder).getKeyCoder();
-      keySelector = new KvToByteBufferKeySelector(keyCoder);
+      if (stateful) {
+        keyCoder = ((KvCoder) valueCoder).getKeyCoder();
+        keySelector = new KvToByteBufferKeySelector(keyCoder);
+      } else {
+        // For an SDF, we know that the input element should be
+        // KV<KV<element, KV<restriction, watermarkState>>, size>. We are going to use the element
+        // as the key.
+        if (!(((KvCoder) valueCoder).getKeyCoder() instanceof KvCoder)) {
+          throw new IllegalStateException(
+              String.format(
+                  Locale.ENGLISH,
+                  "The element coder for splittable DoFn '%s' must be KVCoder(KvCoder, DoubleCoder) but is: %s",
+                  inputPCollectionId,
+                  valueCoder.getClass().getSimpleName()));
+        }
+        keyCoder = ((KvCoder) ((KvCoder) valueCoder).getKeyCoder()).getKeyCoder();
+        keySelector = new SdfByteBufferKeySelector(keyCoder);
+      }

Review comment:
       At least for now we don't support SDF using user states and timers. cc: @robertwb 
   
   Would you like to explain more about "because keys are not guaranteed to be processed on the same operator instance." ? Is it because we check stateful DoFn first?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714134230


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r516993287



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       This was not a bug before we support sdf initiate checkpoint because normal processing time timer doesn't hold watermark. Now we are setting processing time timer to reschedule sdf residuals which holds the watermark back. When entering `ExecutableStageDoFnOperator.close()`, we will always in the while loop until all sdf processing timers get fired and watermark holds are removed. So the comment ` // Sleep for 5s to wait for any timer to be fired.` is not precise. The comment should be ` // Sleep for 5s to wait for any SDF timer to be fired.`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512524675



##########
File path: runners/flink/job-server/flink_job_server.gradle
##########
@@ -166,23 +166,22 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
         excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
         if (streaming) {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
         } else {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+          excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
           excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
         }
-        //SplitableDoFnTests
-        excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-
       }
     },
     testFilter: {
       // TODO(BEAM-10016)
       excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedBounded'

Review comment:
       Maybe add a comment why this one is excluded?

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1315,7 +1316,7 @@ void processPendingProcessingTimeTimers() {
         keyedStateBackend.setCurrentKey(internalTimer.getKey());
         TimerData timer = internalTimer.getNamespace();
         checkInvokeStartBundle();
-        fireTimer(timer);
+        fireTimerInternal((ByteBuffer) internalTimer.getKey(), timer);

Review comment:
       I'm assuming this was a bug?

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
       }
     }
 
+    /** Holds the watermark when there is an sdf timer. */
+    private void onNewSdfTimer(TimerData newTimer) {
+      Preconditions.checkState(
+          StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+      Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+      keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+    }

Review comment:
       Wouldn't it make sense to integrate this check with the `timerUsesOutputTimestamp` method?

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1451,6 +1472,8 @@ void onFiredOrDeletedTimer(TimerData timer) {
         pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
         if (timer.getDomain() == TimeDomain.EVENT_TIME) {
           onRemovedEventTimer(timer);
+        } else if (StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) {
+          onRemovedSdfTimer(timer);

Review comment:
       Could be simplified by having a generic call here to `onFiredTimer`. See above.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -683,8 +694,24 @@ private void translateStreamingImpulse(
                 inputPCollectionId,
                 valueCoder.getClass().getSimpleName()));
       }
-      keyCoder = ((KvCoder) valueCoder).getKeyCoder();
-      keySelector = new KvToByteBufferKeySelector(keyCoder);
+      if (stateful) {
+        keyCoder = ((KvCoder) valueCoder).getKeyCoder();
+        keySelector = new KvToByteBufferKeySelector(keyCoder);
+      } else {
+        // For an SDF, we know that the input element should be
+        // KV<KV<element, KV<restriction, watermarkState>>, size>. We are going to use the element
+        // as the key.
+        if (!(((KvCoder) valueCoder).getKeyCoder() instanceof KvCoder)) {
+          throw new IllegalStateException(
+              String.format(
+                  Locale.ENGLISH,
+                  "The element coder for splittable DoFn '%s' must be KVCoder(KvCoder, DoubleCoder) but is: %s",
+                  inputPCollectionId,
+                  valueCoder.getClass().getSimpleName()));
+        }
+        keyCoder = ((KvCoder) ((KvCoder) valueCoder).getKeyCoder()).getKeyCoder();
+        keySelector = new SdfByteBufferKeySelector(keyCoder);
+      }

Review comment:
       Will SDFs ever support stateful operations? If so, this wouldn't work anymore because keys are not guaranteed to be processed on the same operator instance.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
       }
     }
 
+    /** Holds the watermark when there is an sdf timer. */
+    private void onNewSdfTimer(TimerData newTimer) {
+      Preconditions.checkState(
+          StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+      Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+      keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+    }

Review comment:
       We could rename the mentioned method to `onFiredTimer` and include the checks for output watermark holds in there.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1342,6 +1351,14 @@ private void onRemovedEventTimer(TimerData removedTimer) {
       }
     }
 
+    private void onRemovedSdfTimer(TimerData removedTimer) {
+      Preconditions.checkState(
+          StateAndTimerBundleCheckpointHandler.isSdfTimer(removedTimer.getTimerId()));
+      Preconditions.checkState(timerUsesOutputTimestamp(removedTimer));
+      // Remove the watermark hold which is set for this sdf timer.
+      keyedStateInternals.removeWatermarkHoldUsage(removedTimer.getOutputTimestamp());
+    }

Review comment:
       We could rename the mentioned method to onFiredTimer and include the checks for output watermark holds in there.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1342,6 +1351,14 @@ private void onRemovedEventTimer(TimerData removedTimer) {
       }
     }
 
+    private void onRemovedSdfTimer(TimerData removedTimer) {
+      Preconditions.checkState(
+          StateAndTimerBundleCheckpointHandler.isSdfTimer(removedTimer.getTimerId()));
+      Preconditions.checkState(timerUsesOutputTimestamp(removedTimer));
+      // Remove the watermark hold which is set for this sdf timer.
+      keyedStateInternals.removeWatermarkHoldUsage(removedTimer.getOutputTimestamp());
+    }

Review comment:
       Wouldn't it make sense to integrate this check with the `timerUsesOutputTimestamp` method?

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -484,6 +530,148 @@ void setTimer(Timer<?> timerElement, TimerInternals.TimerData timerData) {
     }
   }
 
+  /**
+   * A {@link TimerInternalsFactory} for Flink operator to create a {@link
+   * StateAndTimerBundleCheckpointHandler} to handle {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+   */
+  class SdfFlinkTimerInternalsFactory implements TimerInternalsFactory<InputT> {

Review comment:
       Maybe we should move these out of this class.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1424,6 +1442,9 @@ private void registerTimer(TimerData timer, String contextTimerId) throws Except
         case PROCESSING_TIME:
         case SYNCHRONIZED_PROCESSING_TIME:
           timerService.registerProcessingTimeTimer(timer, adjustTimestampForFlink(time));
+          if (StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) {
+            onNewSdfTimer(timer);
+          }

Review comment:
       Could be simplified by having a generic call here to `onFiredTimer`. See above.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -484,6 +530,148 @@ void setTimer(Timer<?> timerElement, TimerInternals.TimerData timerData) {
     }
   }
 
+  /**
+   * A {@link TimerInternalsFactory} for Flink operator to create a {@link
+   * StateAndTimerBundleCheckpointHandler} to handle {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+   */
+  class SdfFlinkTimerInternalsFactory implements TimerInternalsFactory<InputT> {
+    @Override
+    public TimerInternals timerInternalsForKey(InputT key) {
+      try {
+        ByteBuffer encodedKey =
+            (ByteBuffer) keySelector.getKey(WindowedValue.valueInGlobalWindow(key));
+        return new SdfFlinkTimerInternals(encodedKey);
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't get a timer internals", e);
+      }
+    }
+  }
+
+  /**
+   * A {@link TimerInternals} for rescheduling {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+   */
+  class SdfFlinkTimerInternals implements TimerInternals {
+    private final ByteBuffer key;
+
+    SdfFlinkTimerInternals(ByteBuffer key) {
+      this.key = key;
+    }
+
+    @Override
+    public void setTimer(
+        StateNamespace namespace,
+        String timerId,
+        String timerFamilyId,
+        Instant target,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
+      setTimer(
+          TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
+    }
+
+    @Override
+    public void setTimer(TimerData timerData) {
+      try {
+        try (Locker locker = Locker.locked(stateBackendLock)) {
+          getKeyedStateBackend().setCurrentKey(key);
+          timerInternals.setTimer(timerData);
+          minEventTimeTimerTimestampInCurrentBundle =
+              Math.min(
+                  minEventTimeTimerTimestampInCurrentBundle,
+                  adjustTimestampForFlink(timerData.getOutputTimestamp().getMillis()));
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't set timer", e);
+      }
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+      throw new UnsupportedOperationException(
+          "It is not expected to use SdfFlinkTimerInternals to delete a timer");
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
+      throw new UnsupportedOperationException(
+          "It is not expected to use SdfFlinkTimerInternals to delete a timer");
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      throw new UnsupportedOperationException(
+          "It is not expected to use SdfFlinkTimerInternals to delete a timer");
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timerInternals.currentProcessingTime();
+    }
+
+    @Override
+    public @Nullable Instant currentSynchronizedProcessingTime() {
+      return timerInternals.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public @Nullable Instant currentOutputWatermarkTime() {
+      return timerInternals.currentOutputWatermarkTime();
+    }
+  }
+
+  /**
+   * A {@link StateInternalsFactory} for Flink operator to create a {@link
+   * StateAndTimerBundleCheckpointHandler} to handle {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}.
+   */
+  class SdfFlinkStateInternalsFactory implements StateInternalsFactory<InputT> {
+    @Override
+    public StateInternals stateInternalsForKey(InputT key) {
+      try {
+        ByteBuffer encodedKey =
+            (ByteBuffer) keySelector.getKey(WindowedValue.valueInGlobalWindow(key));
+        return new SdfFlinkStateInternals(encodedKey);
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't get a state internals", e);
+      }
+    }
+  }
+
+  /** A {@link StateInternals} for keeping {@link DelayedBundleApplication}s as states. */
+  class SdfFlinkStateInternals implements StateInternals {

Review comment:
       Maybe we should move these out of this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r521579939



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Thanks for the explanation! It seems like this problem has got attention from Flink: https://issues.apache.org/jira/browse/FLINK-18647.
   
   I'm still feeling confused on when the operator.close() will be called. It seems like it happens under several condition:
   
   - When checkpointing happens
   
   - When source operator closes successfully, the next downstream operator's close() will be called.
   
   Do I understand it correctly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-717577021


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-716718165






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r513069567



##########
File path: runners/flink/job-server/flink_job_server.gradle
##########
@@ -166,23 +166,22 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
         excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
         if (streaming) {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
         } else {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+          excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
           excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
         }
-        //SplitableDoFnTests
-        excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-
       }
     },
     testFilter: {
       // TODO(BEAM-10016)
       excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedBounded'

Review comment:
       I figured out the test failure and enabled the test in the latest commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz edited a comment on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz edited a comment on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714171015


   > SDF + side input tests fail on validateParDo:
   > 
   > https://github.com/apache/beam/blob/ddcb600b79fdd7d2ea35a843502557d34bdfbb96/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java#L231-L237
   
   It seems like we only have main input for SplittableParDo transform: 
   ```
   splittableParDo.getInputsMap(): {org.apache.beam.sdk.values.PCollection.<init>:399#a00997a449310e6c=PAssert$0/GroupGlobally/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)/ParMultiDo(OutputSingleSource).output}
   ```
   
   @lukecwik Have we added the logic to populate the side input into transform proto?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r517624057



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Updated the logic in https://github.com/apache/beam/pull/13105/commits/d8bb778e64fc591819a90677a9f2e119e706b52f




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [WIP] Initial support for self-checkpoint

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-713245858






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520874302



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Double check the implementation of `DoFnOperator` and `ExecutableStageDoFnOperator`, we already invokes `finishBundle` when reaching 1000 input elements or 1s processing time by default. 
   The real problem for SDF is that it's natural for SDF to read from `Impluse` and execute as a high fan-out DoFn. Based on current structure, once `Impluse` finishes, `close()` of SDF operator will be called, but meanwhile no more processing time timer can be registered. Simply draining timers from operator itself is not ideal. 
   Is it possible for us to change something here? For example, the operator should wait for global watermark advancing to MAX_TIMESTAMP to finish? Or the task should invokes `operator.close()` when global watermark advancing to MAX_TIMESTAMP?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-716848816


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-716848753


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-715633987


   The `testPairWithIndexWindowedTimestampedBounded` in batch also fails without my changes(https://github.com/apache/beam/pull/13176, https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch_PR/183/testReport/org.apache.beam.sdk.transforms/SplittableDoFnTest/).
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-716896663


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-716865260






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-725070624


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r516664835



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       This is a bug then. All processing timers should be fired on `close()`. It would be great to get rid of the sleep which can have unexpected side effects, e.g. when operators are chained in Flink.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-724317615


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512901015



##########
File path: runners/flink/job-server/flink_job_server.gradle
##########
@@ -166,23 +166,22 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
         excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
         if (streaming) {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
         } else {
+          excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+          excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
           excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
         }
-        //SplitableDoFnTests
-        excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
-        excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
-
       }
     },
     testFilter: {
       // TODO(BEAM-10016)
       excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedBounded'

Review comment:
       I haven't explore the failure yet but the test fails on the same reason(wrong results) for batch and streaming. I filed a jira for tracking this: https://issues.apache.org/jira/browse/BEAM-11141




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r513004878



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -683,8 +694,24 @@ private void translateStreamingImpulse(
                 inputPCollectionId,
                 valueCoder.getClass().getSimpleName()));
       }
-      keyCoder = ((KvCoder) valueCoder).getKeyCoder();
-      keySelector = new KvToByteBufferKeySelector(keyCoder);
+      if (stateful) {
+        keyCoder = ((KvCoder) valueCoder).getKeyCoder();
+        keySelector = new KvToByteBufferKeySelector(keyCoder);
+      } else {
+        // For an SDF, we know that the input element should be
+        // KV<KV<element, KV<restriction, watermarkState>>, size>. We are going to use the element
+        // as the key.
+        if (!(((KvCoder) valueCoder).getKeyCoder() instanceof KvCoder)) {
+          throw new IllegalStateException(
+              String.format(
+                  Locale.ENGLISH,
+                  "The element coder for splittable DoFn '%s' must be KVCoder(KvCoder, DoubleCoder) but is: %s",
+                  inputPCollectionId,
+                  valueCoder.getClass().getSimpleName()));
+        }
+        keyCoder = ((KvCoder) ((KvCoder) valueCoder).getKeyCoder()).getKeyCoder();
+        keySelector = new SdfByteBufferKeySelector(keyCoder);
+      }

Review comment:
       It's because we partition based on the element instead of on the key. This wouldn't work with stateful operations where we expect all keys to land on the same partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-715490647






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r512926708



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1315,7 +1316,7 @@ void processPendingProcessingTimeTimers() {
         keyedStateBackend.setCurrentKey(internalTimer.getKey());
         TimerData timer = internalTimer.getNamespace();
         checkInvokeStartBundle();
-        fireTimer(timer);
+        fireTimerInternal((ByteBuffer) internalTimer.getKey(), timer);

Review comment:
       I think so. The only difference is that `fireTimerInternal` will grab the lock and set the key for state backend, which is helpful when the state is needed. Another potential bug around here is that Flink operator will clean up global state first then fire the timer, which may lead to data loss if firing timer is for retrieving the state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714693819


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-725702128


   `Python tests / Python Unit Tests (windows-latest, 3.6, py36)` is not related.
   
   I'm going to merge this PR. Thanks for your help!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r519722086



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Thanks. This still feels like a workaround but at least we have it isolated for SDF only.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-725070528


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-713834891


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r519722652



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +692,10 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      if (hasSdfProcessFn) {
+        // Sleep for 5s to wait for any SDF timer to be fired.
+        Thread.sleep(5000);

Review comment:
       Perhaps add a `TODO` here, since we may want to eventually get rid of the sleep?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520322204



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       I found that `Thread.sleep(5s)` is not a correct solution. The reason why it worked occasionally  is that the bundle is small enough and I got luck on race condition. I think we should figure out a way to be able to fire the SDF processing time timer when a bundle is closed within the life cycle of one `ExecutableStageDoFnOperator`.
   Please correct me if I'm understanding it wrong:
   * Flink starts all operators at the same time and closes the operators when the input watermark reaches MAX_TIMESTAMP, or it closes operators in a reverse topological order and `close()` is a blocking call?
   * The processing time timers will not be fired anymore by the system once the `operator.close()` is invoked.
   * The assumption around `ExecutableStageDoFnOperator` is that there is only one bundle executing inside one operator. When the output watermark advances to MAX_TIMESTAMP, we consider this bundle completed.
   
   With supporting SDF initiated checkpoint, we do need to have several bundles invoked inside one `ExecutableStageDoFnOperator` life cycle, which means we either:
   
   * Enable Flink to fire processing time timers after `Operator.close()` is invoked -- this may not be preferrable.
   * Or we try to close the bundle before we reach to the `Operator.close()`.
   * Or we manually drain SDF timers with scarifying the ability of `resumeDelay()`. For example, the user may want to reschedule the SDF residuals in 5 mins but we have to fire it now.
   
   Do you have any ideas/suggestions? Thanks for your help!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13105:
URL: https://github.com/apache/beam/pull/13105#issuecomment-714699501


   Run Java Flink PortableValidatesRunner Batch


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r513912395



##########
File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility methods for creating {@link BundleCheckpointHandler}s. */
+public class BundleCheckpointHandlers {
+
+  /**
+   * A {@link BundleCheckpointHandler} which uses {@link
+   * org.apache.beam.runners.core.TimerInternals.TimerData} ans {@link
+   * org.apache.beam.sdk.state.ValueState} to reschedule {@link DelayedBundleApplication}.
+   */
+  public static class StateAndTimerBundleCheckpointHandler<T> implements BundleCheckpointHandler {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(StateAndTimerBundleCheckpointHandler.class);
+    private final TimerInternalsFactory<T> timerInternalsFactory;
+    private final StateInternalsFactory<T> stateInternalsFactory;
+    private final Coder<WindowedValue<T>> residualCoder;
+    private final Coder windowCoder;
+    private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+    public static final String SDF_PREFIX = "sdf_checkpoint";
+
+    public StateAndTimerBundleCheckpointHandler(
+        TimerInternalsFactory<T> timerInternalsFactory,
+        StateInternalsFactory<T> stateInternalsFactory,
+        Coder<WindowedValue<T>> residualCoder,
+        Coder windowCoder) {
+      this.residualCoder = residualCoder;
+      this.windowCoder = windowCoder;
+      this.timerInternalsFactory = timerInternalsFactory;
+      this.stateInternalsFactory = stateInternalsFactory;
+    }
+
+    /**
+     * A helper function to help check whether the given timer is the timer which is set for
+     * rescheduling {@link DelayedBundleApplication}.
+     */
+    public static boolean isSdfTimer(String timerId) {
+      return timerId.startsWith(SDF_PREFIX);
+    }
+
+    private static String constructSdfCheckpointId(String id, int index) {
+      return SDF_PREFIX + ":" + id + ":" + index;
+    }
+
+    @Override
+    public void onCheckpoint(ProcessBundleResponse response) {
+      String id = idGenerator.getId();
+      for (int index = 0; index < response.getResidualRootsCount(); index++) {
+        DelayedBundleApplication residual = response.getResidualRoots(index);
+        if (!residual.hasApplication()) {
+          continue;
+        }
+        String tag = constructSdfCheckpointId(id, index);
+        try {
+          WindowedValue<T> stateValue =
+              CoderUtils.decodeFromByteArray(
+                  residualCoder, residual.getApplication().getElement().toByteArray());
+          TimerInternals timerInternals =
+              timerInternalsFactory.timerInternalsForKey((stateValue.getValue()));
+          StateInternals stateInternals =
+              stateInternalsFactory.stateInternalsForKey(stateValue.getValue());
+          // Calculate the timestamp for the timer.
+          Instant timestamp = Instant.now();
+          if (residual.hasRequestedTimeDelay()) {
+            timestamp = timestamp.plus(residual.getRequestedTimeDelay().getSeconds() * 1000);
+          }
+          // Calculate the watermark hold for the timer.
+          long outputTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+          if (!residual.getApplication().getOutputWatermarksMap().isEmpty()) {
+            for (org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark :
+                residual.getApplication().getOutputWatermarksMap().values()) {
+              outputTimestamp = Math.min(outputTimestamp, outputWatermark.getSeconds() * 1000);
+            }
+          } else {
+            outputTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();

Review comment:
       Is it a good idea to rely on `TimerInternals.getCurrentInputWatermark()` to not hold back the watermark so much?
   @mxm 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520874302



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Double check the implementation of `DoFnOperator` and `ExecutableStageDoFnOperator`, we have already invoked `finishBundle` when reaching 1000 input elements or 1s processing time by default. 
   The real problem for SDF is that it's natural for SDF to read from `Impluse` and execute as a high fan-out DoFn. Based on current structure, once `Impluse` finishes, `close()` of SDF operator will be called, but meanwhile no more processing time timer can be registered. Simply draining timers from operator itself is not ideal. 
   Is it possible for us to change something here? For example, the operator should wait for global watermark advancing to MAX_TIMESTAMP to finish? Or the task should invokes `operator.close()` when global watermark advancing to MAX_TIMESTAMP?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org