You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/22 15:19:52 UTC

[GitHub] [beam] mxm opened a new pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

mxm opened a new pull request #11492:
URL: https://github.com/apache/beam/pull/11492


   Pass in the timestamp to avoid:
   
   ```
   INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
   INFO:apache_beam.utils.subprocess_server: response = task()
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
   INFO:apache_beam.utils.subprocess_server: lambda: self.create_worker().do_instruction(request), request)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
   INFO:apache_beam.utils.subprocess_server: getattr(request, request_type), request.instruction_id)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
   INFO:apache_beam.utils.subprocess_server: bundle_processor.process_bundle(instruction_id))
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 910, in process_bundle
   INFO:apache_beam.utils.subprocess_server: element.timer_family_id, timer_data)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/operations.py", line 688, in process_timer
   INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 990, in process_user_timer
   INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1043, in _reraise_augmented
   INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 988, in process_user_timer
   INFO:apache_beam.utils.subprocess_server: self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 517, in invoke_user_timer
   INFO:apache_beam.utils.subprocess_server: self.user_state_context, key, window, timestamp))
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1093, in process_outputs
   INFO:apache_beam.utils.subprocess_server: for result in results:
   INFO:apache_beam.utils.subprocess_server: File "/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py", line 185, in process_timer
   INFO:apache_beam.utils.subprocess_server: timer.set(0)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 589, in set
   INFO:apache_beam.utils.subprocess_server: self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 651, in encode_to_stream
   INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, True)
   INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 608, in encode_to_stream
   INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000
   INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType' object has no attribute 'micros' [while running 'GenerateLoad']
   ```
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r414504120



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -845,10 +845,12 @@ def process_bundle(self,
           (result_future.is_done() and result_future.get().error)):
         if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
           with BundleManager._lock:
-            self.bundle_context_manager.get_buffer(
+            timer_buffer = self.bundle_context_manager.get_buffer(
                 expected_output_timers[(
                     output.transform_id, output.timer_family_id)],
-                output.transform_id).append(output.timers)
+                output.transform_id)
+            timer_buffer.cleared = False

Review comment:
       Where can I find this assumption? Note that the tests are passing.
   
   I think it makes sense to reset the flag; when the timer is set after firing we should treat it independently from any prior settings of the timer. We may want to leave a comment here why this is necessary. Alternatively, the resetting may be done directly after the timer is fired.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-620842616


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r413314835



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -685,7 +685,8 @@ def process_timer(self, tag, timer_data):
         timer_spec,
         timer_data.user_key,
         timer_data.windows[0],
-        timer_data.fire_timestamp)
+        timer_data.fire_timestamp,
+        timer_data.pane_info)

Review comment:
       timer_data.paneinfo




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618414047


   Never mind, just had another look and was able to get it running. PTAL if you consider this a proper solution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz edited a comment on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz edited a comment on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619141429


   > Test failure with batch SDF execution in https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/154/
   > 
   > ```
   > * What went wrong:
   > Execution failed for task ':sdks:python:test-suites:portable:py2:flinkCompatibilityMatrixBatchLOOPBACK'.
   > > Process 'command 'sh'' finished with non-zero exit value 1
   > ```
   > 
   > Looks unrelated to this change since the cron is also failing: https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/803/console
   
   The `test_pardo_timers` fails on unexpected result:
   
   ====================================================================== |  
   -- | --
     | ERROR: test_pardo_timers (__main__.FlinkRunnerTestOptimized) |  
     | ---------------------------------------------------------------------- |  
     | BeamAssertException: Failed assert: [('fired', 20), ('fired', 200), ('fired', 40), ('fired', 400)] == [('fired', Timestamp(20)), ('fired', Timestamp(200))], missing elements [('fired', 40), ('fired', 400)] [while running 'assert_that/Match']
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-620704953


   It looks to me like the remaining failures are merely flakes. @boyuanzz PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-621557824


   Python Flink PVR failures are tracked here: [BEAM-8912](https://issues.apache.org/jira/browse/BEAM-8912)
   
   On the other hand, the 304 failures in the precommit definitely seem like a legit issue. @mxm PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619533578


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618032323


   You can run python fn_runner test to verify the python code.
   
   Setup python virtual env: https://cwiki.apache.org/confluence/display/BEAM/Python+Tips
   Run test_pardo_timer test: `pytest -v apache_beam/runners/portability/fn_api_runner/fn_runner_test.py::FnApiRunnerTest::test_pardo_timers --fulltrace`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618953097


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-621535619


   Two tests are consistently failing. Are they flakes? (And if they are flakes, could someone please file a JIRA? :) )


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619526438


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-620079649


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619533714


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418778828



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##########
@@ -247,25 +247,27 @@ public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionV
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {

Review comment:
       Also, I made this same comment earlier today and it was lost somehow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-620079375


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm edited a comment on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm edited a comment on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619533662


   `test_pardo_timers` is now passing for batch and streaming. The SDF test is still flaky.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r416921999



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -845,10 +845,12 @@ def process_bundle(self,
           (result_future.is_done() and result_future.get().error)):
         if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
           with BundleManager._lock:
-            self.bundle_context_manager.get_buffer(
+            timer_buffer = self.bundle_context_manager.get_buffer(
                 expected_output_timers[(
                     output.transform_id, output.timer_family_id)],
-                output.transform_id).append(output.timers)
+                output.transform_id)
+            timer_buffer.cleared = False

Review comment:
       Did you run into these errors when setting a timer from the timer call, Max? I think it would be good to explicitly reset the buffer, rather than manipulate its flag (e.g. write a timer_buffer.reset function). Or at least check `if timer_buffer.cleared: timer_buffer.cleared = False`, to confirm that the rest of the internal context in `ListBuffer` is cleared.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz edited a comment on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz edited a comment on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619141429


   > Test failure with batch SDF execution in https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/154/
   > 
   > ```
   > * What went wrong:
   > Execution failed for task ':sdks:python:test-suites:portable:py2:flinkCompatibilityMatrixBatchLOOPBACK'.
   > > Process 'command 'sh'' finished with non-zero exit value 1
   > ```
   > 
   > Looks unrelated to this change since the cron is also failing: https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/803/console
   
   The `test_pardo_timers` fails on unexpected result:
   ```
   ERROR: test_pardo_timers (__main__.FlinkRunnerTestOptimized)
   BeamAssertException: Failed assert: [('fired', 20), ('fired', 200), ('fired', 40), ('fired', 400)] == [('fired', Timestamp(20)), ('fired', Timestamp(200))], missing elements [('fired', 40), ('fired', 400)] [while running 'assert_that/Match']
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418778740



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##########
@@ -247,25 +247,27 @@ public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionV
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {

Review comment:
       It makes sense to do this in batch as well since the processing timers should get dropped and any newly scheduled event time timers should be continually fired.
   
   This would require updating FlinkExecutableStageFunction and SparkExecutableStageFunction




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm edited a comment on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm edited a comment on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618414047


   Never mind, just had another look and was able to get it running. Please take another look and let me know if you consider this a proper solution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r413315281



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -685,7 +685,8 @@ def process_timer(self, tag, timer_data):
         timer_spec,
         timer_data.user_key,
         timer_data.windows[0],
-        timer_data.fire_timestamp)
+        timer_data.fire_timestamp,
+        timer_data.pane_info)

Review comment:
       https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate.py#L134-L142




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-621503987






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r413175330



##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -229,7 +229,7 @@ def invoke_timer_callback(self, user_state_context, key, window, timestamp):
         kwargs[kw] = user_state_context.get_state(state_spec, key, window)
       for kw, timer_spec in self.timer_args_to_replace.items():
         kwargs[kw] = user_state_context.get_timer(
-            timer_spec, key, window, None, None)
+            timer_spec, key, window, timestamp, None)

Review comment:
       The paneinfo should also be passed in here. You can get paneinfo from `timer.paneinfo`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-617924115


   You may also want to run `tox -e py3-yapf` to fix python formatter with you virtual env enabled. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-621546509


   Lgtm as long as the failures are flakes


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz edited a comment on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz edited a comment on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619141429


   > Test failure with batch SDF execution in https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/154/
   > 
   > ```
   > * What went wrong:
   > Execution failed for task ':sdks:python:test-suites:portable:py2:flinkCompatibilityMatrixBatchLOOPBACK'.
   > > Process 'command 'sh'' finished with non-zero exit value 1
   > ```
   > 
   > Looks unrelated to this change since the cron is also failing: https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/803/console
   
   The `test_pardo_timers` fails on unexpected result:
   ```
   ERROR: test_pardo_timers (__main__.FlinkRunnerTestOptimized)
   BeamAssertException: Failed assert: [('fired', 20), ('fired', 200), ('fired', 40), ('fired', 400)] == [('fired', Timestamp(20)), ('fired', Timestamp(200))], missing elements [('fired', 40), ('fired', 400)] [while running 'assert_that/Match']
   ```
   
   If you want to repro the failure on your local machine:
   1. build flink jar: ./gradlew runners:flink:1.10:job-server:shadowJar
   2. python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${YOUR_BEAM_DIR}/runners/flink/1.10/job-server/build/libs/beam-runners-flink-1.10-job-server-2.21.0-SNAPSHOT.jar --environment_type=LOOPBACK FlinkRunnerTest.test_pardo_timers


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619141429


   > Test failure with batch SDF execution in https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/154/
   > 
   > ```
   > * What went wrong:
   > Execution failed for task ':sdks:python:test-suites:portable:py2:flinkCompatibilityMatrixBatchLOOPBACK'.
   > > Process 'command 'sh'' finished with non-zero exit value 1
   > ```
   > 
   > Looks unrelated to this change since the cron is also failing: https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/803/console
   
   Please refer to https://scans.gradle.com/s/fda5ioqe2cgla/console-log?task=:sdks:python:test-suites:portable:py2:flinkCompatibilityMatrixBatchLOOPBACK#L44009 for the test failure.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418780573



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##########
@@ -247,25 +247,27 @@ public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionV
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {

Review comment:
       Agree that this should apply to Spark runner too. To clarify, FlinkExecutableStageFunction is for batch only, meaning this change only affected Flink batch. I assume Flink streaming (ExecutableStageDoFnOperator) was working from the start.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-622008605


   Down to one flaky test in https://builds.apache.org/job/beam_PreCommit_Python_Commit/12595/
   
   I still need to take a look at the failing test in https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/4434/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-622006914


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618433782






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-620842521


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619370440


   The issue was that the portable batch mode didn't support timers being set by other timers yet. Streaming mode worked fine and so should the batch mode now. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-622576973


   Flakes are BEAM-9767 and BEAM-8912. I'm going to merge this so we can go ahead with the release.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418813369



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##########
@@ -247,25 +247,27 @@ public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionV
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {

Review comment:
       Made #11595 to update Spark.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r414734137



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -845,10 +845,12 @@ def process_bundle(self,
           (result_future.is_done() and result_future.get().error)):
         if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
           with BundleManager._lock:
-            self.bundle_context_manager.get_buffer(
+            timer_buffer = self.bundle_context_manager.get_buffer(
                 expected_output_timers[(
                     output.transform_id, output.timer_family_id)],
-                output.transform_id).append(output.timers)
+                output.transform_id)
+            timer_buffer.cleared = False

Review comment:
       @pabloem who is the original author of `ListBuffer`. I prefer the second way Max mentioned above. What do you think? Other than this part, timer changes look good to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618882229


   The Flink Python tests are blocked on #11362.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618510664


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618326851


   Thanks for your help @boyuanzz. Unfortunately, I wasn't able to let the test pass. I'm running into a problem with setting a timer from a timer callback. 
   
   ```
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:851:
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   self = <apache_beam.runners.portability.fn_api_runner.execution.ListBuffer object at 0x1180dbed0>
   element = b'\x02k1\x00\x00\x00\x00\x01\x00\x80\x00\x00\x00\x00\x0fB@\x80\x00\x00\x00\x00\x00N \x0f\x02k2\x00\x00\x00\x00\x01\x00\x80\x00\x00\x00\x00\x0fB@\x80\x00\x00\x00\x00\x03\r@\x0f'
   
       def append(self, element):
         # type: (bytes) -> None
         if self.cleared:
   >       raise RuntimeError('Trying to append to a cleared ListBuffer.')
   E       RuntimeError: Trying to append to a cleared ListBuffer.
   
   apache_beam/runners/portability/fn_api_runner/execution.py:96: RuntimeError
   ====================================================================== 1 failed, 3 warnings in 2.43 seconds =======================================================================
   ```
   
   I've tried resetting the timer buffer but that led to the test getting stuck. I don't have the time to continue to work on this. If you or someone else could take this over I'd appreciate it. It would be good to fix this for the release.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618939929


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-619533662


   `test_pardo_timers` is now passing for batch and streaming.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-622081746


   > I still need to take a look at the failing test in https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/4434/
   
   Do you mean that it's related to this PR, or it's not, or you don't know? I'm inclined to say the failure isn't related to these changes, since no tests visibly fail (Python prints `OK (skipped=16)`), like my comment [here](https://issues.apache.org/jira/browse/BEAM-8912?focusedCommentId=16990196&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16990196).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on issue #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-617996234


   Nice to see that we have auto-formatting now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-618976360


   Test failure with batch SDF execution in https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/154/
   ```
   * What went wrong:
   Execution failed for task ':sdks:python:test-suites:portable:py2:flinkCompatibilityMatrixBatchLOOPBACK'.
   > Process 'command 'sh'' finished with non-zero exit value 1
   ```
   
   Looks unrelated to this change since the cron is also failing: https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/803/console


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r414276374



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -845,10 +845,12 @@ def process_bundle(self,
           (result_future.is_done() and result_future.get().error)):
         if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
           with BundleManager._lock:
-            self.bundle_context_manager.get_buffer(
+            timer_buffer = self.bundle_context_manager.get_buffer(
                 expected_output_timers[(
                     output.transform_id, output.timer_family_id)],
-                output.transform_id).append(output.timers)
+                output.transform_id)
+            timer_buffer.cleared = False

Review comment:
       I don't think we should set `timer_buffer.cleared` to `False` here.  There is an assumption around `FnApiRunner.ListBuffer` that per key the timer is only fired once. It seems like the assumption is broken by setting a new timer when firing the timer. If that's the case, we should consider updating the logic around timer buffer. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r418943612



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##########
@@ -247,25 +247,27 @@ public void reduce(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionV
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {

Review comment:
       Yes, this is a batch-only change (also evident in the commit message). The streaming operator already does that. 
   
   > processing timers should get dropped and any newly scheduled event time timers should be continually fired.
   
   We do not drop processing timers. We fire all timers, including new ones, until there are none left.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11492: [BEAM-9801] Pass in fire timestamp to timer callback

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11492:
URL: https://github.com/apache/beam/pull/11492#issuecomment-622488737


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org