You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/13 07:46:25 UTC

[GitHub] [beam] jbartok opened a new pull request #12567: Update Jet Runner

jbartok opened a new pull request #12567:
URL: https://github.com/apache/beam/pull/12567


   Since there were no pre/post commit checks set up for the Jet Runner, it failed to keep up with recent changes so some validatesRunner tests, which were previously ok, started failing. 
   
   Fixed them now and added pre-commit checks.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-674974098


   Run Spotless PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-674987886


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12567: Update Jet Runner

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-674967583


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jbartok commented on pull request #12567: Update Jet Runner

Posted by GitBox <gi...@apache.org>.
jbartok commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-673320081


   R: @lukecwik 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-675087543


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-674987969


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #12567:
URL: https://github.com/apache/beam/pull/12567


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jbartok commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
jbartok commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-675277125


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-675057200


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jbartok commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
jbartok commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-675390904


   I think this is ready to be merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12567: Update Jet Runner

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12567:
URL: https://github.com/apache/beam/pull/12567#discussion_r471584362



##########
File path: runners/jet/build.gradle
##########
@@ -78,6 +78,22 @@ task validatesRunnerBatch(type: Test) {
         exclude '**/SplittableDoFnTest.class' //Splittable DoFn functionality not yet in the runner
         excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
     }
+    filter {
+        //Jet Runner doesn't current support @RequiresTimeSortedInput annotation.
+        excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.*TimeSortedInput*'
+
+        //Event type not supported in TestStream: class org.apache.beam.sdk.testing.AutoValue_TestStream_ProcessingTimeEvent
+        excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampWithProcessingTime'
+        excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testProcessingTimeTimerCanBeReset'
+
+        //unbounded streams created from bounded sources not supported by Jet Runner
+        excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded'
+
+        //timer output timestamps not supported by Jet Runner
+        excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp'
+
+        excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testAssertionSiteIsCaptured*'

Review comment:
       Lets exclude test categories when possible based upon missing features within the runner. This will allow additional tests to be added without needing the exclusion lists of each runner to be updated.
   
   ```suggestion
           excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
           
           //Jet Runner doesn't current support @RequiresTimeSortedInput annotation.
           excludeCategories 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput'
   
           //Event type not supported in TestStream: class org.apache.beam.sdk.testing.AutoValue_TestStream_ProcessingTimeEvent
           excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
       }
       filter {
           // unbounded streams created from bounded sources not supported by Jet Runner
           excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded'
   
           // timer output timestamps not supported by Jet Runner
           excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp'
   
           excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testAssertionSiteIsCaptured*'
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12567: Update Jet Runner

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12567:
URL: https://github.com/apache/beam/pull/12567#discussion_r470090398



##########
File path: build.gradle
##########
@@ -155,6 +155,7 @@ task javaPreCommit() {
   dependsOn ":sdks:java:extensions:sql:jdbc:preCommit"
   dependsOn ":sdks:java:javadoc:allJavadoc"
   dependsOn ":runners:direct-java:needsRunnerTests"
+  dependsOn ":runners:jet:validatesRunner"

Review comment:
       I gave bad advice earlier, we have split out the validates runner suites to be their own jenkins job (for example https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Twister2.groovy).

##########
File path: runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
##########
@@ -276,36 +262,94 @@ Processor getEx(
           inputValueCoder,
           outputValueCoders,
           ordinalToSideInput,
+          sideInputMapping,
           ownerId,
           stepId);
     }
   }
 
-  private static class KeyedStepContext implements StepContext {
+  private class KeyedStepContext implements StepContext {
 
-    private final Map<Object, InMemoryStateInternals> stateInternalsOfKeys;
-    private final InMemoryTimerInternals timerInternals;
+    private final Object nullKey = new Object();
 
-    private InMemoryStateInternals currentStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryStateInternals> keyedStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryTimerInternals> keyedTimerInternals;
 
-    KeyedStepContext(InMemoryTimerInternals timerInternals) {
-      this.stateInternalsOfKeys = new HashMap<>();
-      this.timerInternals = timerInternals;
+    @SuppressWarnings("ThreadLocalUsage")
+    private final ThreadLocal<Object> currentKey = new ThreadLocal<>();
+
+    KeyedStepContext() {
+      this.keyedStateInternals = new ConcurrentHashMap<>();
+      this.keyedTimerInternals = new ConcurrentHashMap<>();
     }
 
     void setKey(Object key) {
-      currentStateInternals =
-          stateInternalsOfKeys.computeIfAbsent(key, InMemoryStateInternals::forKey);
+      Object normalizedKey = key == null ? nullKey : key;
+      currentKey.set(normalizedKey);
+      keyedStateInternals.computeIfAbsent(normalizedKey, InMemoryStateInternals::forKey);
+      keyedTimerInternals.computeIfAbsent(normalizedKey, k -> new InMemoryTimerInternals());
+    }
+
+    void clearKey() {
+      currentKey.remove();
     }
 
     @Override
     public StateInternals stateInternals() {
-      return currentStateInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedStateInternals.get(key);
     }
 
     @Override
     public TimerInternals timerInternals() {
-      return timerInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedTimerInternals.get(key);
+    }
+
+    public void advanceProcessingTimes() {
+      Instant now = Instant.now();
+      keyedTimerInternals
+          .values()
+          .forEach(
+              timerInternals -> {
+                try {
+                  timerInternals.advanceProcessingTime(now);
+                  timerInternals.advanceSynchronizedProcessingTime(now);
+                } catch (Exception e) {
+                  throw new RuntimeException("Failed advancing time!");
+                }
+              });
+    }
+
+    public void flushTimers(long watermark) {
+      Instant watermarkInstant = new Instant(watermark);
+      keyedTimerInternals
+          .entrySet()
+          .forEach(
+              (entry) -> {
+                InMemoryTimerInternals timerInternals = entry.getValue();
+                if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) {
+                  try {
+                    timerInternals.advanceInputWatermark(watermarkInstant);
+                    if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+                      timerInternals.advanceProcessingTime(watermarkInstant);
+                      timerInternals.advanceSynchronizedProcessingTime(watermarkInstant);

Review comment:
       We should be dropping processing based timers if the watermark advances to infinity and not be executing them.
   
   We can keep the existing behavior that we had but in a follow-up PR it would make sense to fix this logic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jbartok commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
jbartok commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-675351536


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12567: [BEAM-10715] Update Jet Runner validates runner testing

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12567:
URL: https://github.com/apache/beam/pull/12567#issuecomment-675018053


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jbartok commented on a change in pull request #12567: Update Jet Runner

Posted by GitBox <gi...@apache.org>.
jbartok commented on a change in pull request #12567:
URL: https://github.com/apache/beam/pull/12567#discussion_r471405630



##########
File path: runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
##########
@@ -276,36 +262,94 @@ Processor getEx(
           inputValueCoder,
           outputValueCoders,
           ordinalToSideInput,
+          sideInputMapping,
           ownerId,
           stepId);
     }
   }
 
-  private static class KeyedStepContext implements StepContext {
+  private class KeyedStepContext implements StepContext {
 
-    private final Map<Object, InMemoryStateInternals> stateInternalsOfKeys;
-    private final InMemoryTimerInternals timerInternals;
+    private final Object nullKey = new Object();
 
-    private InMemoryStateInternals currentStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryStateInternals> keyedStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryTimerInternals> keyedTimerInternals;
 
-    KeyedStepContext(InMemoryTimerInternals timerInternals) {
-      this.stateInternalsOfKeys = new HashMap<>();
-      this.timerInternals = timerInternals;
+    @SuppressWarnings("ThreadLocalUsage")
+    private final ThreadLocal<Object> currentKey = new ThreadLocal<>();
+
+    KeyedStepContext() {
+      this.keyedStateInternals = new ConcurrentHashMap<>();
+      this.keyedTimerInternals = new ConcurrentHashMap<>();
     }
 
     void setKey(Object key) {
-      currentStateInternals =
-          stateInternalsOfKeys.computeIfAbsent(key, InMemoryStateInternals::forKey);
+      Object normalizedKey = key == null ? nullKey : key;
+      currentKey.set(normalizedKey);
+      keyedStateInternals.computeIfAbsent(normalizedKey, InMemoryStateInternals::forKey);
+      keyedTimerInternals.computeIfAbsent(normalizedKey, k -> new InMemoryTimerInternals());
+    }
+
+    void clearKey() {
+      currentKey.remove();
     }
 
     @Override
     public StateInternals stateInternals() {
-      return currentStateInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedStateInternals.get(key);
     }
 
     @Override
     public TimerInternals timerInternals() {
-      return timerInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedTimerInternals.get(key);
+    }
+
+    public void advanceProcessingTimes() {
+      Instant now = Instant.now();
+      keyedTimerInternals
+          .values()
+          .forEach(
+              timerInternals -> {
+                try {
+                  timerInternals.advanceProcessingTime(now);
+                  timerInternals.advanceSynchronizedProcessingTime(now);
+                } catch (Exception e) {
+                  throw new RuntimeException("Failed advancing time!");
+                }
+              });
+    }
+
+    public void flushTimers(long watermark) {
+      Instant watermarkInstant = new Instant(watermark);
+      keyedTimerInternals
+          .entrySet()
+          .forEach(
+              (entry) -> {
+                InMemoryTimerInternals timerInternals = entry.getValue();
+                if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) {
+                  try {
+                    timerInternals.advanceInputWatermark(watermarkInstant);
+                    if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+                      timerInternals.advanceProcessingTime(watermarkInstant);
+                      timerInternals.advanceSynchronizedProcessingTime(watermarkInstant);

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org