You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/04 06:02:25 UTC

[GitHub] [beam] boyuanzz opened a new pull request #11314: [WIP] Send Timers over Data Channel as Elements

boyuanzz opened a new pull request #11314: [WIP] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314
 
 
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405824290
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -1088,6 +1142,30 @@ def create_operation(self,
         transform_proto.spec.payload, parameter_type)
     return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def get_timer_coders(self):
+    timer_coder = {}
+    for transform_id, transform_proto in self.descriptor.transforms.items():
 
 Review comment:
   I see us doing this loop three times now. Perhaps it would be more useful to do the loop once to set everything up, creating a single dictionary (transform_id, timer_family_id) -> (all info about that timer we need to dispatch them).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405870539
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##########
 @@ -257,6 +230,19 @@ public static ParDoPayload translateParDo(
       restrictionCoderId = "";
     }
 
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>) mainInput.getWindowingStrategy().getWindowFn().windowCoder();
+    Coder<?> keyCoder;
+    if (signature.usesState() || signature.usesTimers()) {
+      checkArgument(
+          mainInput.getCoder() instanceof KvCoder,
+          "DoFn's that use state or timers must have an input PCollection with a KvCoder but received %s",
+          mainInput.getCoder());
 
 Review comment:
   It was being covered by validation in DoFnSignatures but it is being repeated here for defense in depth reasons.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406491316
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##########
 @@ -536,7 +525,8 @@ def _run_stage(self,
         runner_execution_context,
         bundle_context_manager,
         data_input,
-        data_output,
+        data_output, {},
 
 Review comment:
   yapf helps me put the {} here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [WIP] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [WIP] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405102954
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1294,16 +1295,43 @@ def _pardo_fn_data(self):
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def to_runner_api(self, context, main_inputs, has_parts=False):
 
 Review comment:
   This is starting to look like a lot of code duplication. How about we pass (all) inputs as a keyword argument, and let `PTransform.to_runner_api` take an `**extra_kwargs` that it passes on to `to_runner_api_parameter`. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405987932
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+    if named_inputs is None or not self._signature.is_stateful_dofn():
+      return None, None
+    main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+    input_pcoll = named_inputs[main_input]
+    kv_type_hint = input_pcoll.element_type
+    if kv_type_hint and kv_type_hint != typehints.Any:
+      coder = coders.registry.get_coder(kv_type_hint)
+      if not coder.is_kv_coder():
+        raise ValueError(
+            'Input elements to the transform %s with stateful DoFn must be '
+            'key-value pairs.' % self)
+      key_coder = coder.key_coder()
+    else:
+      key_coder = coders.registry.get_coder(typehints.Any)
+    window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+    return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+    # type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+    has_parts = extra_kwargs.get('has_part', False)
+    urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
 
 Review comment:
   https://engdoc.corp.google.com/eng/doc/devguide/py/totw/026.md?cl=head

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik merged pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406387842
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -1088,6 +1145,21 @@ def create_operation(self,
         transform_proto.spec.payload, parameter_type)
     return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   Ack, expanding the whole diff I see that this is happening in different methods now (in which case here two separate maps, as you had originally, might be preferable). But not a big deal. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [WIP] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [WIP] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405102421
 
 

 ##########
 File path: sdks/python/apache_beam/pipeline.py
 ##########
 @@ -1071,6 +1071,14 @@ def named_inputs(self):
     }
     return dict(main_inputs, **side_inputs)
 
+  def main_inputs(self):
 
 Review comment:
   Generic transforms don't have the notion of main inputs, let's filter things out in the implementation in ParDo. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611229498
 
 
   I'm sorry. I am havinhg a heavy headache. I'll bow out. @robertwb can you review fn_runner.py and siblings?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405817899
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -611,7 +611,7 @@ def __init__(self,
                transform_id,  # type: str
                key_coder,  # type: coders.Coder
                window_coder,  # type: coders.Coder
-               timer_family_specs  # type: Mapping[str, beam_runner_api_pb2.TimerFamilySpec]
+               timer_coders
 
 Review comment:
   type?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406467481
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##########
 @@ -987,8 +1019,10 @@ def __init__(
 
   def process_bundle(self,
                      inputs,  # type: Mapping[str, PartitionableBuffer]
-                     expected_outputs  # type: DataOutput
-                    ):
+                     expected_outputs,  # type: DataOutput
+                     fired_timers,  # type: Mapping[str, Mapping[str, PartitionableBuffer]]
 
 Review comment:
   For consistency, should this be a `Mapping[Tuple[str, str], PartitionableBuffer]`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406002550
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -408,27 +470,67 @@ def close_callback(data):
     return ClosableOutputStream.create(
         close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
 
+  def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
+    def add_to_send_queue(timer):
+      if timer:
+        self._to_send.put(
+            beam_fn_api_pb2.Elements.Timer(
+                instruction_id=instruction_id,
+                transform_id=transform_id,
+                timer_family_id=timer_family_id,
+                timers=timer,
+                is_last=False))
+
+    def close_callback(timer):
+      add_to_send_queue(timer)
+      self._to_send.put(
+          beam_fn_api_pb2.Elements.Timer(
+              instruction_id=instruction_id,
+              transform_id=transform_id,
+              timer_family_id=timer_family_id,
+              timers=b'',
+              is_last=True))
+
+    return ClosableOutputStream.create(
+        close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
+
   def _write_outputs(self):
     # type: () -> Iterator[beam_fn_api_pb2.Elements]
-    done = False
-    while not done:
-      data = [self._to_send.get()]
-      try:
-        # Coalesce up to 100 other items.
-        for _ in range(100):
-          data.append(self._to_send.get_nowait())
-      except queue.Empty:
-        pass
-      if data[-1] is self._WRITES_FINISHED:
-        done = True
-        data.pop()
-      if data:
-        yield beam_fn_api_pb2.Elements(data=data)
+    stream_done = False
+    while not stream_done:
+      streams = None
+      if not stream_done:
 
 Review comment:
   This will always be true (given the loop condition). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405870882
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##########
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
         sdkFusedStage =
             pipeline == null
                 ? RegisterNodeFunction.withoutPipeline(
-                    idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                    idGenerator,
+                    sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+                    sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   They both use the Data API so no. All were saying here is that we will re-use the same gRPC channel for both timers and data.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406394861
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -1088,6 +1145,21 @@ def create_operation(self,
         transform_proto.spec.payload, parameter_type)
     return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   The keys of maps should still be the same though(tuple of (transform_id, timer_family_id)). That's why I make the value as a map{coder_impl, output_stream}

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613006148
 
 
   > This is a big change which also affects the runners. Would it have made sense to notify Runner authors, especially since post commit tests are broken? It took me a bit to figure out what caused the regression.
   
   Thanks, Max! Sorry for the inconvenience. It seems like currently both Spark and Flink fail on the same test: org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerAlignBounded. The failure pattern is also the same: the pipeline only produces the output from timer, not from the ProcessElement fn. I think there should be something wrong in the java runner shared library code. Have you worked on it? Or do you want me to follow up fixing this issue?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406004352
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/operations.pxd
 ##########
 @@ -92,7 +92,7 @@ cdef class DoOperation(Operation):
   cdef DoFnRunner dofn_runner
   cdef object tagged_receivers
   cdef object side_input_maps
-  cdef object user_state_context
+  cpdef public object user_state_context
 
 Review comment:
   Rather than making this public, I would add an `add_timer_info` method to this operation. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405985691
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+    if named_inputs is None or not self._signature.is_stateful_dofn():
+      return None, None
+    main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+    input_pcoll = named_inputs[main_input]
+    kv_type_hint = input_pcoll.element_type
+    if kv_type_hint and kv_type_hint != typehints.Any:
+      coder = coders.registry.get_coder(kv_type_hint)
+      if not coder.is_kv_coder():
+        raise ValueError(
+            'Input elements to the transform %s with stateful DoFn must be '
+            'key-value pairs.' % self)
+      key_coder = coder.key_coder()
+    else:
+      key_coder = coders.registry.get_coder(typehints.Any)
+    window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+    return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+    # type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+    has_parts = extra_kwargs.get('has_part', False)
 
 Review comment:
   You can leave this in the parameter list.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405907504
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##########
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
         sdkFusedStage =
             pipeline == null
                 ? RegisterNodeFunction.withoutPipeline(
-                    idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                    idGenerator,
+                    sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+                    sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   That is correct.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611607510
 
 
   Run Python2_PVR_Flink PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406490454
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1272,6 +1272,8 @@ def expand(self, pcoll):
         key_coder = coder.key_coder()
       else:
         key_coder = coders.registry.get_coder(typehints.Any)
+      self.window_coder = pcoll.windowing.windowfn.get_window_coder()
 
 Review comment:
   No. Will removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406490504
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -837,25 +869,59 @@ def process_bundle(self, instruction_id):
         op.execution_context = execution_context
         op.start()
 
-      # Inject inputs from data plane.
+      # Each data_channel is mapped to a list of expected inputs which includes
+      # both data input and timer input. The data input is identied by
+      # transform_id. The data input is identified by
+      # (transform_id, timer_family_id).
       data_channels = collections.defaultdict(
           list
       )  # type: DefaultDict[data_plane.GrpcClientDataChannel, List[str]]
+
+      # Inject data inputs from data plane.
 
 Review comment:
   Updated the comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405831920
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##########
 @@ -257,6 +230,19 @@ public static ParDoPayload translateParDo(
       restrictionCoderId = "";
     }
 
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>) mainInput.getWindowingStrategy().getWindowFn().windowCoder();
+    Coder<?> keyCoder;
+    if (signature.usesState() || signature.usesTimers()) {
+      checkArgument(
+          mainInput.getCoder() instanceof KvCoder,
+          "DoFn's that use state or timers must have an input PCollection with a KvCoder but received %s",
+          mainInput.getCoder());
 
 Review comment:
   Just curious: did we not have this check before, and just failed when attempting to cast to KVCoder  (in the removed block from `translate` above)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613085672
 
 
   I was actually working on something related to timers in #11362 and was surprised to see that the test failed when I opened the PR, since I had run tests locally. Then figured something must have changed on master in the meantime. Thanks for following up with this!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405990026
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -629,28 +628,28 @@ def __init__(self,
     self._transform_id = transform_id
     self._key_coder = key_coder
     self._window_coder = window_coder
-    self._timer_family_specs = timer_family_specs
-    self._timer_receivers = None  # type: Optional[Dict[str, operations.ConsumerSet]]
+    # A mapping of {timer_family_id: OutputStream}
+    self._timer_output_streams = {}
+    self._timer_coders_impl = {}
     self._all_states = {
     }  # type: Dict[tuple, userstate.AccumulatingRuntimeState]
 
-  def update_timer_receivers(self, receivers):
-    # type: (operations._TaggedReceivers) -> None
-
-    """TODO"""
-    self._timer_receivers = {}
-    for tag in self._timer_family_specs:
-      self._timer_receivers[tag] = receivers.pop(tag)
+  def add_timer_info(self, timer_family_id, output_stream, coder_impl):
+    self._timer_output_streams[timer_family_id] = output_stream
+    self._timer_coders_impl[timer_family_id] = coder_impl
 
   def get_timer(
       self,
       timer_spec,
       key,
-      window  # type: windowed_value.BoundedWindow
-  ):
+      window,  # type: windowed_value.BoundedWindow
+      pane):
     # type: (...) -> OutputTimer
-    assert self._timer_receivers is not None
-    return OutputTimer(key, window, self._timer_receivers[timer_spec.name])
+    output_stream = self._timer_output_streams[timer_spec.name]
 
 Review comment:
   If this were a single map rather that two parallel maps, you could write something like
   
   `output_tream, timer_coder_impl = self._timer_info(timer_spec.name]`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406473230
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##########
 @@ -914,6 +926,17 @@ def process_bundle(self,
 
     split_manager = self._select_split_manager()
     if not split_manager:
+      # Send the fired timers if any.
+      for (transform_id, timer_family_id), timers in fired_timers.items():
+        self._send_timers_to_worker(
+            process_bundle_id, transform_id, timer_family_id, timers)
+
+      for transform_id, timer_family_id in (
+          set(expected_output_timers.keys()) - set(fired_timers.keys())):
+        # Close the stream if there is no timers to be sent.
 
 Review comment:
   This is a subtle point. I might write something like "The worker waits for a logical timer stream to be closed for every possible timer, regardless of whether there are any timers to be sent."
   
   Maybe it'd be clearer to iterate over `expected_output_timers`, and send `fired_timers.get((transform_id, timer_family_id), [])`. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405998009
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -274,20 +301,47 @@ def inverse(self):
     return self._inverse
 
   def input_elements(self,
-                     instruction_id,  # type: str
-                     unused_expected_transforms=None,  # type: Optional[Collection[str]]
-                     abort_callback=None  # type: Optional[Callable[[], bool]]
-                    ):
-    # type: (...) -> Iterator[beam_fn_api_pb2.Elements.Data]
 
 Review comment:
   It'd be good to not lose the typing information. You can make an alias at the top of the file `DataOrTimers = Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timer]` to cut down on verbosity, here and elsewhere. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405994886
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -745,6 +746,24 @@ def __init__(self,
     self.process_bundle_descriptor = process_bundle_descriptor
     self.state_handler = state_handler
     self.data_channel_factory = data_channel_factory
+
+    # There is no guarantee that the runner only set
+    # timer_api_service_descriptor when having timers. So this field cannot be
+    # used as an indicator of timers.
+    if self.process_bundle_descriptor.timer_api_service_descriptor:
+      self.timer_data_channel = (
+          data_channel_factory.create_data_channel_from_url(
+              self.process_bundle_descriptor.timer_api_service_descriptor.url))
+    else:
+      self.timer_data_channel = None
+
+    # A mapping of
+    # {(transform_id, timer_family_id):
+    # {"timer_coder_impl": coder, "output_stream": stream}}
 
 Review comment:
   Optonal: [Named] tuples are usually easier to work with than dicts.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406344891
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
                key,
                window,  # type: windowed_value.BoundedWindow
-               receiver  # type: operations.ConsumerSet
+               paneinfo,
+               timer_family_id,
+               timer_coder_impl,
+               output_stream
               ):
     self._key = key
     self._window = window
-    self._receiver = receiver
+    self._paneinfo = paneinfo
+    self._timer_family_id = timer_family_id
+    self._output_stream = output_stream
+    self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
-    # TODO(BEAM-9562): Plumb through actual timer fields.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=ts,
-                hold_timestamp=ts,
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     ts, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=ts,
+        hold_timestamp=ts,
+        paneinfo=self._paneinfo)
+    self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+    self._output_stream.maybe_flush()
 
   def clear(self):
     # type: () -> None
     dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
     clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-    # TODO(BEAM-9562): Plumb through actual paneinfo.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=timestamp.Timestamp.of(clear_ts),
-                hold_timestamp=timestamp.Timestamp.of(0),
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     0, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=clear_ts,
 
 Review comment:
   Correct, when `clear_bit` is `True`, the coder ignores these fields. I think we should have a better `Timer` with API `of` and `clear` like in Java as a follow up.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406471091
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##########
 @@ -896,7 +906,9 @@ def _generate_splits_for_testing(self,
 
   def process_bundle(self,
                      inputs,  # type: Mapping[str, PartitionableBuffer]
-                     expected_outputs  # type: DataOutput
+                     expected_outputs,  # type: DataOutput
+                     fired_timers,  # type: Mapping[str, Mapping[str, PartitionableBuffer]]
 
 Review comment:
   Mapping[Tuple[str, str], PartitionableBuffer]?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405816460
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
                key,
                window,  # type: windowed_value.BoundedWindow
-               receiver  # type: operations.ConsumerSet
+               paneinfo,
+               timer_family_id,
+               timer_coder_impl,
+               output_stream
               ):
     self._key = key
     self._window = window
-    self._receiver = receiver
+    self._paneinfo = paneinfo
+    self._timer_family_id = timer_family_id
+    self._output_stream = output_stream
+    self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
-    # TODO(BEAM-9562): Plumb through actual timer fields.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=ts,
-                hold_timestamp=ts,
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     ts, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=ts,
+        hold_timestamp=ts,
+        paneinfo=self._paneinfo)
+    self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+    self._output_stream.maybe_flush()
 
   def clear(self):
     # type: () -> None
     dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
     clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-    # TODO(BEAM-9562): Plumb through actual paneinfo.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=timestamp.Timestamp.of(clear_ts),
-                hold_timestamp=timestamp.Timestamp.of(0),
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     0, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=clear_ts,
 
 Review comment:
   Don't bother setting these timestamps, or paneinfo.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406442486
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+    if named_inputs is None or not self._signature.is_stateful_dofn():
+      return None, None
+    main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+    input_pcoll = named_inputs[main_input]
+    kv_type_hint = input_pcoll.element_type
+    if kv_type_hint and kv_type_hint != typehints.Any:
+      coder = coders.registry.get_coder(kv_type_hint)
+      if not coder.is_kv_coder():
+        raise ValueError(
+            'Input elements to the transform %s with stateful DoFn must be '
+            'key-value pairs.' % self)
+      key_coder = coder.key_coder()
+    else:
+      key_coder = coders.registry.get_coder(typehints.Any)
+    window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+    return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+    # type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+    has_parts = extra_kwargs.get('has_part', False)
+    urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
 
 Review comment:
   Nevermind, I see what's going on here. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613034995
 
 
   The problem is with the Timer implementation inside the FnApiDoFnRunner. The spec for Timer wasn't clear as to what the defaults were when withOutputTimestamp was added and hence some critical logic was deleted during the migration.
   
   See #11402 for the fix.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611572961
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405892978
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
                key,
                window,  # type: windowed_value.BoundedWindow
-               receiver  # type: operations.ConsumerSet
+               paneinfo,
+               timer_family_id,
+               timer_coder_impl,
+               output_stream
               ):
     self._key = key
     self._window = window
-    self._receiver = receiver
+    self._paneinfo = paneinfo
+    self._timer_family_id = timer_family_id
+    self._output_stream = output_stream
+    self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
-    # TODO(BEAM-9562): Plumb through actual timer fields.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=ts,
-                hold_timestamp=ts,
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     ts, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=ts,
+        hold_timestamp=ts,
+        paneinfo=self._paneinfo)
+    self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+    self._output_stream.maybe_flush()
 
   def clear(self):
     # type: () -> None
     dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
     clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-    # TODO(BEAM-9562): Plumb through actual paneinfo.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=timestamp.Timestamp.of(clear_ts),
-                hold_timestamp=timestamp.Timestamp.of(0),
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     0, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=clear_ts,
 
 Review comment:
   > (Should the coder be ignoring them as well?)
   
   No, the timer coder is encoding all of these info now.
   
   > Don't bother setting these timestamps, or paneinfo.
   
   Could you please explain more about this? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mxm commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-612825390
 
 
   This is a big change which also affects the runners. Would it have made sense to notify Runner authors, especially since post commit tests are broken? It took me a bit to figure out what caused the regression.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405892978
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
                key,
                window,  # type: windowed_value.BoundedWindow
-               receiver  # type: operations.ConsumerSet
+               paneinfo,
+               timer_family_id,
+               timer_coder_impl,
+               output_stream
               ):
     self._key = key
     self._window = window
-    self._receiver = receiver
+    self._paneinfo = paneinfo
+    self._timer_family_id = timer_family_id
+    self._output_stream = output_stream
+    self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
-    # TODO(BEAM-9562): Plumb through actual timer fields.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=ts,
-                hold_timestamp=ts,
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     ts, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=ts,
+        hold_timestamp=ts,
+        paneinfo=self._paneinfo)
+    self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+    self._output_stream.maybe_flush()
 
   def clear(self):
     # type: () -> None
     dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
     clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-    # TODO(BEAM-9562): Plumb through actual paneinfo.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=timestamp.Timestamp.of(clear_ts),
-                hold_timestamp=timestamp.Timestamp.of(0),
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     0, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=clear_ts,
 
 Review comment:
   > (Should the coder be ignoring them as well?)
   No, the timer coder is encoding all of these info now.
   
   > Don't bother setting these timestamps, or paneinfo.
   Could you please explain more about this? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406003424
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -408,27 +470,67 @@ def close_callback(data):
     return ClosableOutputStream.create(
         close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
 
+  def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
+    def add_to_send_queue(timer):
+      if timer:
+        self._to_send.put(
+            beam_fn_api_pb2.Elements.Timer(
+                instruction_id=instruction_id,
+                transform_id=transform_id,
+                timer_family_id=timer_family_id,
+                timers=timer,
+                is_last=False))
+
+    def close_callback(timer):
+      add_to_send_queue(timer)
+      self._to_send.put(
+          beam_fn_api_pb2.Elements.Timer(
+              instruction_id=instruction_id,
+              transform_id=transform_id,
+              timer_family_id=timer_family_id,
+              timers=b'',
+              is_last=True))
+
+    return ClosableOutputStream.create(
+        close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
+
   def _write_outputs(self):
     # type: () -> Iterator[beam_fn_api_pb2.Elements]
-    done = False
-    while not done:
-      data = [self._to_send.get()]
-      try:
-        # Coalesce up to 100 other items.
-        for _ in range(100):
-          data.append(self._to_send.get_nowait())
-      except queue.Empty:
-        pass
-      if data[-1] is self._WRITES_FINISHED:
-        done = True
-        data.pop()
-      if data:
-        yield beam_fn_api_pb2.Elements(data=data)
+    stream_done = False
+    while not stream_done:
+      streams = None
+      if not stream_done:
+        streams = [self._to_send.get()]
+        try:
+          # Coalesce up to 100 other items.
+          for _ in range(100):
+            streams.append(self._to_send.get_nowait())
+        except queue.Empty:
+          pass
+        if streams and streams[-1] is self._WRITES_FINISHED:
+          stream_done = True
+          streams.pop()
+      if streams:
+        elements = beam_fn_api_pb2.Elements()
+        data_stream = []
+        timer_stream = []
+        for stream in streams:
+          if isinstance(stream, beam_fn_api_pb2.Elements.Timer):
+            timer_stream.append(stream)
+          if isinstance(stream, beam_fn_api_pb2.Elements.Data):
+            data_stream.append(stream)
+        if data_stream:
 
 Review comment:
   No need to have these conditionals, you can just write
   
   `yield beam_fn_api_pb2.Elements(data=data_stream, timer=timer_stream)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405995554
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -1088,6 +1145,21 @@ def create_operation(self,
         transform_proto.spec.payload, parameter_type)
     return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   I would populate output_stream here as well rather than above.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405826837
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -846,6 +870,30 @@ def process_bundle(self, instruction_id):
         data_channels[input_op.data_channel].append(input_op.transform_id)
         input_op_by_transform_id[input_op.transform_id] = input_op
 
+        # Set up timer output stream
+      timer_output_streams = {}
+      for transform_id, timer_list in self.timer_ids.items():
+        output_streams = {}
+        for timer_id in timer_list:
+          output_streams[
+              timer_id] = self.timer_data_channel.output_timer_stream(
+                  instruction_id, transform_id, timer_id)
+          timer_output_streams[transform_id] = output_streams
+        self.process_timer_ops[
+            transform_id].user_state_context.update_timer_output_streams(
+                output_streams)
+
+      # Process timers
+      if self.timer_data_channel:
 
 Review comment:
   We can't safely assume the runner will finish sending all timers before sending any of the data (and the buffer may get full, resulting in a deadlock). I think we need to have a data_channel.inputs() that returns both data and timers and then branch in the loop. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405999346
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -372,12 +428,18 @@ def input_elements(self,
             t, v, tb = self._exc_info
             raise_(t, v, tb)
         else:
-          # TODO(BEAM-9558): Cleanup once dataflow is updated.
-          if not data.data or data.is_last:
-            done_transforms.add(data.transform_id)
-          else:
-            assert data.transform_id not in done_transforms
-            yield data
+          if isinstance(element, beam_fn_api_pb2.Elements.Timer):
+            if element.is_last:
+              done_inputs.add((element.transform_id, element.timer_family_id))
+            else:
+              yield element
+          if isinstance(element, beam_fn_api_pb2.Elements.Data):
 
 Review comment:
   elif

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405890215
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##########
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
         sdkFusedStage =
             pipeline == null
                 ? RegisterNodeFunction.withoutPipeline(
-                    idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                    idGenerator,
+                    sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+                    sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   I see. So we only have a separate timer_api_service_descriptor in the protos so that a runner has the option to make it separate, but it doesn't need to be separate?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406465514
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -837,25 +869,59 @@ def process_bundle(self, instruction_id):
         op.execution_context = execution_context
         op.start()
 
-      # Inject inputs from data plane.
+      # Each data_channel is mapped to a list of expected inputs which includes
+      # both data input and timer input. The data input is identied by
+      # transform_id. The data input is identified by
+      # (transform_id, timer_family_id).
       data_channels = collections.defaultdict(
           list
       )  # type: DefaultDict[data_plane.GrpcClientDataChannel, List[str]]
+
+      # Inject data inputs from data plane.
 
 Review comment:
   This comment is a bit misleading, as the injection doesn't happen in this for loop. (Similarly with timers.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406481820
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+    if named_inputs is None or not self._signature.is_stateful_dofn():
+      return None, None
+    main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+    input_pcoll = named_inputs[main_input]
+    kv_type_hint = input_pcoll.element_type
+    if kv_type_hint and kv_type_hint != typehints.Any:
+      coder = coders.registry.get_coder(kv_type_hint)
+      if not coder.is_kv_coder():
+        raise ValueError(
+            'Input elements to the transform %s with stateful DoFn must be '
+            'key-value pairs.' % self)
+      key_coder = coder.key_coder()
+    else:
+      key_coder = coders.registry.get_coder(typehints.Any)
+    window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+    return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
 
 Review comment:
   We can delete this override since we pass `extra_kwargs` from `PTransform` now. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405817317
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
                key,
                window,  # type: windowed_value.BoundedWindow
-               receiver  # type: operations.ConsumerSet
+               paneinfo,
+               timer_family_id,
+               timer_coder_impl,
+               output_stream
               ):
     self._key = key
     self._window = window
-    self._receiver = receiver
+    self._paneinfo = paneinfo
+    self._timer_family_id = timer_family_id
+    self._output_stream = output_stream
+    self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
-    # TODO(BEAM-9562): Plumb through actual timer fields.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=ts,
-                hold_timestamp=ts,
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     ts, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=ts,
+        hold_timestamp=ts,
+        paneinfo=self._paneinfo)
+    self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+    self._output_stream.maybe_flush()
 
   def clear(self):
     # type: () -> None
     dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
     clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-    # TODO(BEAM-9562): Plumb through actual paneinfo.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=timestamp.Timestamp.of(clear_ts),
-                hold_timestamp=timestamp.Timestamp.of(0),
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     0, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=clear_ts,
 
 Review comment:
   (Should the coder be ignoring them as well?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] ibzib commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
ibzib commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613000822
 
 
   @mxm Which post commits are you referring to? & Can you please mark the jira(s) with fix version 2.21.0 so we can fix the regression in the release?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405990412
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -745,6 +746,24 @@ def __init__(self,
     self.process_bundle_descriptor = process_bundle_descriptor
     self.state_handler = state_handler
     self.data_channel_factory = data_channel_factory
+
+    # There is no guarantee that the runner only set
+    # timer_api_service_descriptor when having timers. So this field cannot be
+    # used as an indicator of timers.
+    if self.process_bundle_descriptor.timer_api_service_descriptor:
+      self.timer_data_channel = (
+          data_channel_factory.create_data_channel_from_url(
+              self.process_bundle_descriptor.timer_api_service_descriptor.url))
+    else:
+      self.timer_data_channel = None
+
+    # A mapping of
+    # {(transform_id, timer_family_id):
+    # {"timer_coder_impl": coder, "output_stream": stream}}
+    # The mapping keeps empty when there is no timer_family_specs in the
 
 Review comment:
   Nit: The mapping stays (or is) empty...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406474463
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##########
 @@ -355,20 +364,41 @@ def _build_process_bundle_descriptor(self):
             items()),
         environments=dict(
             self.execution_context.pipeline_components.environments.items()),
-        state_api_service_descriptor=self.state_api_service_descriptor())
+        state_api_service_descriptor=self.state_api_service_descriptor(),
+        timer_api_service_descriptor=self.data_api_service_descriptor())
 
   def get_input_coder_impl(self, transform_id):
     # type: (str) -> CoderImpl
     coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
         self.process_bundle_descriptor.transforms[transform_id].spec.payload
     ).coder_id
     assert coder_id
+    return self.get_coder_impl(coder_id)
+
+  def _build_timer_coders_id_map(self):
+    timer_coder_ids = {}
+    for transform_id, transform_proto in (self._process_bundle_descriptor
+        .transforms.items()):
+      if transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn:
+        pardo_payload = proto_utils.parse_Bytes(
+            transform_proto.spec.payload, beam_runner_api_pb2.ParDoPayload)
+        for id, timer_family_spec in pardo_payload.timer_family_specs.items():
+          timer_coder_ids[(transform_id, id)] = (
+              timer_family_spec.timer_family_coder_id)
+    return timer_coder_ids
+
+  def get_coder_impl(self, coder_id):
     if coder_id in self.execution_context.safe_coders:
       return self.execution_context.pipeline_context.coders[
           self.execution_context.safe_coders[coder_id]].get_impl()
     else:
       return self.execution_context.pipeline_context.coders[coder_id].get_impl()
 
+  def get_timer_coder_impl(self, transform_id, timer_family_id):
+    assert (transform_id, timer_family_id) in self._timer_coder_ids
 
 Review comment:
   The key error if it's not present below will be sufficient. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406490556
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##########
 @@ -987,8 +1019,10 @@ def __init__(
 
   def process_bundle(self,
                      inputs,  # type: Mapping[str, PartitionableBuffer]
-                     expected_outputs  # type: DataOutput
-                    ):
+                     expected_outputs,  # type: DataOutput
+                     fired_timers,  # type: Mapping[str, Mapping[str, PartitionableBuffer]]
 
 Review comment:
   I updated the `fired_timers` implementation but forgot to update the typing here. Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406382954
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -1088,6 +1145,21 @@ def create_operation(self,
         transform_proto.spec.payload, parameter_type)
     return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   We can only populate output_stream when processing bundle since instruction_id is required.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405825392
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -846,6 +870,30 @@ def process_bundle(self, instruction_id):
         data_channels[input_op.data_channel].append(input_op.transform_id)
         input_op_by_transform_id[input_op.transform_id] = input_op
 
+        # Set up timer output stream
+      timer_output_streams = {}
+      for transform_id, timer_list in self.timer_ids.items():
+        output_streams = {}
+        for timer_id in timer_list:
+          output_streams[
+              timer_id] = self.timer_data_channel.output_timer_stream(
+                  instruction_id, transform_id, timer_id)
+          timer_output_streams[transform_id] = output_streams
+        self.process_timer_ops[
+            transform_id].user_state_context.update_timer_output_streams(
 
 Review comment:
   Nit: rather than this double nesting, it might simplify things to have an `update_timer_output_streams(timer_id, output_stream)` method that could be called repeatedly. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405989307
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
                key,
                window,  # type: windowed_value.BoundedWindow
-               receiver  # type: operations.ConsumerSet
+               paneinfo,
+               timer_family_id,
+               timer_coder_impl,
+               output_stream
               ):
     self._key = key
     self._window = window
-    self._receiver = receiver
+    self._paneinfo = paneinfo
+    self._timer_family_id = timer_family_id
+    self._output_stream = output_stream
+    self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
-    # TODO(BEAM-9562): Plumb through actual timer fields.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=ts,
-                hold_timestamp=ts,
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     ts, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=ts,
+        hold_timestamp=ts,
+        paneinfo=self._paneinfo)
+    self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+    self._output_stream.maybe_flush()
 
   def clear(self):
     # type: () -> None
     dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
     clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-    # TODO(BEAM-9562): Plumb through actual paneinfo.
-    self._receiver.receive(
-        windowed_value.WindowedValue((
-            self._key,
-            userstate.Timer(
-                user_key='',
-                dynamic_timer_tag='',
-                windows=(self._window, ),
-                clear_bit=False,
-                fire_timestamp=timestamp.Timestamp.of(clear_ts),
-                hold_timestamp=timestamp.Timestamp.of(0),
-                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
-                                     0, (self._window, )))
+    timer = userstate.Timer(
+        user_key=self._key,
+        dynamic_timer_tag='',
+        windows=(self._window, ),
+        clear_bit=False,
+        fire_timestamp=clear_ts,
 
 Review comment:
   They're meaningless when we're clearing a timer (e.g. it won't fire, hold back the watermark, or have a pane info).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405986529
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+    if named_inputs is None or not self._signature.is_stateful_dofn():
+      return None, None
+    main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+    input_pcoll = named_inputs[main_input]
+    kv_type_hint = input_pcoll.element_type
+    if kv_type_hint and kv_type_hint != typehints.Any:
+      coder = coders.registry.get_coder(kv_type_hint)
+      if not coder.is_kv_coder():
+        raise ValueError(
+            'Input elements to the transform %s with stateful DoFn must be '
+            'key-value pairs.' % self)
+      key_coder = coder.key_coder()
+    else:
+      key_coder = coders.registry.get_coder(typehints.Any)
+    window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+    return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+    # type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+    has_parts = extra_kwargs.get('has_part', False)
+    urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
+    if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
+      # TODO(BEAM-3812): Remove this fallback.
+      urn, typed_param = self.to_runner_api_pickled(context)
+    return beam_runner_api_pb2.FunctionSpec(
+        urn=urn,
+        payload=typed_param.SerializeToString() if isinstance(
+            typed_param, message.Message) else typed_param.encode('utf-8')
+        if isinstance(typed_param, str) else typed_param)
+
+  def to_runner_api_parameter(self, context, **extra_kwargs):
     # type: (PipelineContext) -> typing.Tuple[str, message.Message]
     assert isinstance(self, ParDo), \
         "expected instance of ParDo, but got %s" % self.__class__
+    key_coder, window_coder = self._get_key_and_window_coder(
 
 Review comment:
   Maybe put this in the if block below closer to where they're used? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405815160
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
                key,
                window,  # type: windowed_value.BoundedWindow
-               receiver  # type: operations.ConsumerSet
+               paneinfo,
+               timer_family_id,
+               timer_coder_impl,
+               output_stream
 
 Review comment:
   A type on this parameter would be useful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405998237
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -274,20 +301,47 @@ def inverse(self):
     return self._inverse
 
   def input_elements(self,
-                     instruction_id,  # type: str
-                     unused_expected_transforms=None,  # type: Optional[Collection[str]]
-                     abort_callback=None  # type: Optional[Callable[[], bool]]
-                    ):
-    # type: (...) -> Iterator[beam_fn_api_pb2.Elements.Data]
+      instruction_id,  # type: str
+      unused_expected_inputes=None,   # type: Collection[str]
 
 Review comment:
   inputes -> inputs

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406003474
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -408,27 +470,67 @@ def close_callback(data):
     return ClosableOutputStream.create(
         close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
 
+  def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
+    def add_to_send_queue(timer):
+      if timer:
+        self._to_send.put(
+            beam_fn_api_pb2.Elements.Timer(
+                instruction_id=instruction_id,
+                transform_id=transform_id,
+                timer_family_id=timer_family_id,
+                timers=timer,
+                is_last=False))
+
+    def close_callback(timer):
+      add_to_send_queue(timer)
+      self._to_send.put(
+          beam_fn_api_pb2.Elements.Timer(
+              instruction_id=instruction_id,
+              transform_id=transform_id,
+              timer_family_id=timer_family_id,
+              timers=b'',
+              is_last=True))
+
+    return ClosableOutputStream.create(
+        close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
+
   def _write_outputs(self):
     # type: () -> Iterator[beam_fn_api_pb2.Elements]
-    done = False
-    while not done:
-      data = [self._to_send.get()]
-      try:
-        # Coalesce up to 100 other items.
-        for _ in range(100):
-          data.append(self._to_send.get_nowait())
-      except queue.Empty:
-        pass
-      if data[-1] is self._WRITES_FINISHED:
-        done = True
-        data.pop()
-      if data:
-        yield beam_fn_api_pb2.Elements(data=data)
+    stream_done = False
+    while not stream_done:
+      streams = None
+      if not stream_done:
+        streams = [self._to_send.get()]
+        try:
+          # Coalesce up to 100 other items.
+          for _ in range(100):
+            streams.append(self._to_send.get_nowait())
+        except queue.Empty:
+          pass
+        if streams and streams[-1] is self._WRITES_FINISHED:
+          stream_done = True
+          streams.pop()
+      if streams:
+        elements = beam_fn_api_pb2.Elements()
+        data_stream = []
+        timer_stream = []
+        for stream in streams:
+          if isinstance(stream, beam_fn_api_pb2.Elements.Timer):
+            timer_stream.append(stream)
+          if isinstance(stream, beam_fn_api_pb2.Elements.Data):
 
 Review comment:
   else

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406466215
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##########
 @@ -1321,80 +1321,18 @@ def remove_data_plane_ops(stages, pipeline_context):
       yield stage
 
 
-def inject_timer_pcollections(stages, pipeline_context):
+def setup_timer_mapping(stages, pipeline_context):
   # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
 
-  """Create PCollections for fired timers and to-be-set timers.
-
-  At execution time, fired timers and timers-to-set are represented as
-  PCollections that are managed by the runner.  This phase adds the
-  necissary collections, with their read and writes, to any stages using
-  timers.
+  """Set up a mapping of {transform_id: [timer_ids]} for each stage.
   """
   for stage in stages:
-    for transform in list(stage.transforms):
+    for transform in stage.transforms:
       if transform.spec.urn in PAR_DO_URNS:
         payload = proto_utils.parse_Bytes(
             transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
-        for tag, spec in payload.timer_family_specs.items():
-          if len(transform.inputs) > 1:
-            raise NotImplementedError('Timers and side inputs.')
-          input_pcoll = pipeline_context.components.pcollections[next(
-              iter(transform.inputs.values()))]
-          # Create the appropriate coder for the timer PCollection.
-          key_coder_id = input_pcoll.coder_id
-          if (pipeline_context.components.coders[key_coder_id].spec.urn ==
-              common_urns.coders.KV.urn):
-            key_coder_id = pipeline_context.components.coders[
-                key_coder_id].component_coder_ids[0]
-          key_timer_coder_id = pipeline_context.add_or_get_coder_id(
-              beam_runner_api_pb2.Coder(
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.coders.KV.urn),
-                  component_coder_ids=[
-                      key_coder_id, spec.timer_family_coder_id
-                  ]))
-          # Inject the read and write pcollections.
-          timer_read_pcoll = unique_name(
-              pipeline_context.components.pcollections,
-              '%s_timers_to_read_%s' % (transform.unique_name, tag))
-          timer_write_pcoll = unique_name(
-              pipeline_context.components.pcollections,
-              '%s_timers_to_write_%s' % (transform.unique_name, tag))
-          pipeline_context.components.pcollections[timer_read_pcoll].CopyFrom(
-              beam_runner_api_pb2.PCollection(
-                  unique_name=timer_read_pcoll,
-                  coder_id=key_timer_coder_id,
-                  windowing_strategy_id=input_pcoll.windowing_strategy_id,
-                  is_bounded=input_pcoll.is_bounded))
-          pipeline_context.components.pcollections[timer_write_pcoll].CopyFrom(
-              beam_runner_api_pb2.PCollection(
-                  unique_name=timer_write_pcoll,
-                  coder_id=key_timer_coder_id,
-                  windowing_strategy_id=input_pcoll.windowing_strategy_id,
-                  is_bounded=input_pcoll.is_bounded))
-          stage.transforms.append(
-              beam_runner_api_pb2.PTransform(
-                  unique_name=timer_read_pcoll + '/Read',
-                  outputs={'out': timer_read_pcoll},
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=bundle_processor.DATA_INPUT_URN,
-                      payload=create_buffer_id(timer_read_pcoll,
-                                               kind='timers'))))
-          stage.transforms.append(
-              beam_runner_api_pb2.PTransform(
-                  unique_name=timer_write_pcoll + '/Write',
-                  inputs={'in': timer_write_pcoll},
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=bundle_processor.DATA_OUTPUT_URN,
-                      payload=create_buffer_id(
-                          timer_write_pcoll, kind='timers'))))
-          assert tag not in transform.inputs
-          transform.inputs[tag] = timer_read_pcoll
-          assert tag not in transform.outputs
-          transform.outputs[tag] = timer_write_pcoll
-          stage.timer_pcollections.append(
-              (timer_read_pcoll + '/Read', timer_write_pcoll))
+        for timer_family_id in payload.timer_family_specs.keys():
+          stage.timers.add((transform.unique_name, timer_family_id))
 
 Review comment:
   Nice simplification here :). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405823916
 
 

 ##########
 File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 ##########
 @@ -914,6 +923,7 @@ public Object createRunnerForPTransform(
                       PipelineOptions pipelineOptions,
                       BeamFnDataClient beamFnDataClient,
                       BeamFnStateClient beamFnStateClient,
+                      BeamFnTimerClient beamFnTimerClient,
                       String pTransformId,
                       PTransform pTransform,
                       Supplier<String> processBundleInstructionId,
 
 Review comment:
   This test is `testStateCallsFailIfNoStateApiServiceDescriptorSpecified`. Is there value in a `testTimerCallsFailIfNoTimerApiServiceDescriptorSpecified` to exercise the new "Timers are unsupported because ..." exception?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406468331
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##########
 @@ -536,7 +525,8 @@ def _run_stage(self,
         runner_execution_context,
         bundle_context_manager,
         data_input,
-        data_output,
+        data_output, {},
 
 Review comment:
   Put {} on its own line. (Surprised yapf didn't complain, or maybe you haven't run it yet.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405892635
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
 ##########
 @@ -134,14 +136,18 @@ public static RegisterNodeFunction forPipeline(
    * harnesses, then this method should be removed.
    */
   public static RegisterNodeFunction withoutPipeline(
-      IdGenerator idGenerator, Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
-    return new RegisterNodeFunction(null, idGenerator, stateApiServiceDescriptor);
+      IdGenerator idGenerator,
+      Endpoints.ApiServiceDescriptor stateApiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor timerApiServiceDescriptor) {
+    return new RegisterNodeFunction(
+        null, idGenerator, stateApiServiceDescriptor, timerApiServiceDescriptor);
   }
 
   private RegisterNodeFunction(
       @Nullable RunnerApi.Pipeline pipeline,
       IdGenerator idGenerator,
-      Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
+      Endpoints.ApiServiceDescriptor stateApiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor timerApiServiceDescriptor) {
 
 Review comment:
   timerApiServiceDescriptor isn't used? Should it be stored and written to the ProcessBundleDescrioptor?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405999089
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -354,15 +410,15 @@ def input_elements(self,
 
     Args:
       instruction_id(str): instruction_id for which data is read
-      expected_transforms(collection): expected transforms
+      expected_inputs(collection): expected inputs, include both data and timer.
     """
     received = self._receiving_queue(instruction_id)
-    done_transforms = set()  # type: Set[str]
+    done_inputs = set()  # type: Set[str]
 
 Review comment:
   update type hint

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405873264
 
 

 ##########
 File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 ##########
 @@ -914,6 +923,7 @@ public Object createRunnerForPTransform(
                       PipelineOptions pipelineOptions,
                       BeamFnDataClient beamFnDataClient,
                       BeamFnStateClient beamFnStateClient,
+                      BeamFnTimerClient beamFnTimerClient,
                       String pTransformId,
                       PTransform pTransform,
                       Supplier<String> processBundleInstructionId,
 
 Review comment:
   for completeness yes

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611363247
 
 
   The [test_pardo_timers_clear](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L351-L389) fails with streaming Flink. The python sdk sends all timers(hold_timestamp=-INF with python default behavior) but only gets the timer with timestamp=20 back. Given the test only fails when streaming, it seems like something not correct with watermark(?). @lukecwik 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406444781
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+    if named_inputs is None or not self._signature.is_stateful_dofn():
+      return None, None
+    main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+    input_pcoll = named_inputs[main_input]
+    kv_type_hint = input_pcoll.element_type
+    if kv_type_hint and kv_type_hint != typehints.Any:
+      coder = coders.registry.get_coder(kv_type_hint)
+      if not coder.is_kv_coder():
+        raise ValueError(
+            'Input elements to the transform %s with stateful DoFn must be '
+            'key-value pairs.' % self)
+      key_coder = coder.key_coder()
+    else:
+      key_coder = coders.registry.get_coder(typehints.Any)
+    window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+    return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
 
 Review comment:
   This code looks like it's copied from the superclass, instead just do
   
   ```
   def to_runner_api(self, context, named_inputs, **extra_kwargs):
     super(ParDo, self).to_runner_api, named_inputs=named_inputs, **extra_kwargs)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406444656
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/core.py
 ##########
 @@ -1272,6 +1272,8 @@ def expand(self, pcoll):
         key_coder = coder.key_coder()
       else:
         key_coder = coders.registry.get_coder(typehints.Any)
+      self.window_coder = pcoll.windowing.windowfn.get_window_coder()
 
 Review comment:
   Are these still used?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405855696
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##########
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
         sdkFusedStage =
             pipeline == null
                 ? RegisterNodeFunction.withoutPipeline(
-                    idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+                    idGenerator,
+                    sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+                    sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   Isn't the timer API service descriptor different from the data API service descriptor? Does that need to be plumbed through SdkHarnessRegistry and used here instead of the data API descriptor? (same question below and in streaming worker)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services