You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/23 20:06:09 UTC

[GitHub] [beam] boyuanzz opened a new pull request #11199: [BEAM-9562] Update Timer encoding to V2

boyuanzz opened a new pull request #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199
 
 
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r400607853
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,14 +49,26 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a timer for the given timestamp with a user specified payload.
+   *
+   * @return
+   */
+  // TODO(BEAM-9562): Plumb through actual Timer fields.
+  public static <T> Timer<T> of(
 
 Review comment:
   Thanks for you suggestion! I really like this idea. I planned to refine the signature when moving to data channel, but I can do it in this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402688089
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
 
 Review comment:
   ```suggestion
      * Returns the {@link PaneInfo} that is related to the timer. This field is nullable only when the
      * timer is being cleared.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402418237
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
 
 Review comment:
   ```suggestion
     /** Returns whether the timer is going to be cleared. */
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#discussion_r397213244
 
 

 ##########
 File path: model/pipeline/src/main/proto/beam_runner_api.proto
 ##########
 @@ -718,9 +726,15 @@ message StandardCoders {
     //                        1: 80 00 00 00 00 00 00 01
     //                      256: 80 00 00 00 00 00 01 00
     //      9223372036854775807: FF FF FF FF FF FF FF FF
-    //   payload - user defined data, uses the component coder
-    // Components: Coder for the payload.
-    TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];
+    //
+    //   hold timestamp - similar to fire timestamp, but stands for the
+    //     watermark the timer is expected to hold.
+    //
+    //   windows - uses the component coder.
+    //
+    //   paneinfo - similar to how paneinfo gets encoded in WindowedValue.
+    // Components: Coder for the user key, coder for the windows,
+    TIMER = 4 [(beam_urn) = "beam:coder:timer:v2"];
 
 Review comment:
   You don't need to change this to v2 since these are not stable yet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r397445467
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -609,16 +609,20 @@ message Elements {
     // represents the producer of these timers.
     string transform_id = 2;
 
+    // (Optional) The local timer family name used to identify the associated
+    // timer family specification
+    string timer_family_tag = 3;
+
     // (Optional) The local timer name used to identify the associated timer specification.
-    string timer_id = 3;
+    string timer_id = 4;
 
     // (Optional) Represents a logical byte stream of a timer. Encoded according
     // to the coder in the timer spec.
-    bytes timer = 4;
+    bytes timer = 5;
 
 Review comment:
   The `Timer` represents some set of encoded timers for a specific PTransform and timer family.
   
   Its important to support many timers within a `timer` and it follows the logical data stream as described here: https://s.apache.org/beam-fn-api-send-and-receive-data

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591496
 
 

 ##########
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##########
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
     self._timestamp_coder_impl = TimestampCoderImpl()
-    self._payload_coder_impl = payload_coder_impl
+    self._boolean_coder_impl = BooleanCoderImpl()
+    self._pane_info_coder_impl = PaneInfoCoderImpl()
+    self._key_coder_impl = key_coder_impl
+    self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+    self._tag_coder_impl = tag_coder_impl
 
   def encode_to_stream(self, value, out, nested):
     # type: (dict, create_OutputStream, bool) -> None
-    self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
-    self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+    self._key_coder_impl.encode_to_stream(value.user_key, out, nested)
 
 Review comment:
   ```suggestion
       self._key_coder_impl.encode_to_stream(value.user_key, out, True)
   ```
   
   We should ignore the nested field since it will change the encoding.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588039
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,14 +49,26 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a timer for the given timestamp with a user specified payload.
 
 Review comment:
   ```suggestion
      * Returns a timer for the given timestamp with a user specified key.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402414733
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
 
 Review comment:
   ```suggestion
      * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code fireTimestamp},
      * {@code holdTimestamp}, {@code windows} and {@code pane}.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402689183
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    if (this.getClearBit()) {
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && Objects.equals(this.getWindows(), that.getWindows());
+    }
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+        && (this.getClearBit() == that.getClearBit())
+        && this.getFireTimestamp().equals(that.getFireTimestamp())
+        && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    if (getClearBit()) {
+      return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows());
+    }
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
+  }
 
   /**
    * A {@link org.apache.beam.sdk.coders.Coder} for timers.
    *
-   * <p>This coder is deterministic if the payload coder is deterministic.
+   * <p>This coder is deterministic if both the key coder and window coder are deterministic.
    *
-   * <p>This coder is inexpensive for size estimation of elements if the payload coder is
-   * inexpensive for size estimation.
+   * <p>This coder is inexpensive for size estimation of elements if the key coder is inexpensive
+   * for size estimation.
 
 Review comment:
   ```suggestion
      * <p>This coder is inexpensive for size estimation of elements if the key coder and window coder are inexpensive
      * for size estimation.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402426932
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the clearBit is true.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the windows which is associated with the timer. This field is nullable only when the
+   * clearBit is true.
+   */
+  @Nullable
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * clearBit is true.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    } else {
+      Timer<?> that = (Timer<?>) other;
+
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && this.getFireTimestamp().equals(that.getFireTimestamp())
+          && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+          && Objects.equals(this.getWindows(), that.getWindows())
+          && Objects.equals(this.getPane(), that.getPane());
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
+  }
 
   /**
    * A {@link org.apache.beam.sdk.coders.Coder} for timers.
    *
-   * <p>This coder is deterministic if the payload coder is deterministic.
+   * <p>This coder is deterministic if both the key coder and window coder are deterministic.
    *
-   * <p>This coder is inexpensive for size estimation of elements if the payload coder is
-   * inexpensive for size estimation.
+   * <p>This coder is inexpensive for size estimation of elements if the key coder is inexpensive
+   * for size estimation.
    */
   public static class Coder<T> extends StructuredCoder<Timer<T>> {
 
-    public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      return new Coder(payloadCoder);
+    public static <T> Coder<T> of(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      return new Coder<>(keyCoder, windowCoder);
     }
 
-    private final org.apache.beam.sdk.coders.Coder<T> payloadCoder;
-
-    private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      this.payloadCoder = payloadCoder;
+    private final org.apache.beam.sdk.coders.Coder<T> keyCoder;
+    private final org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>>
+        windowsCoder;
+    private final org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder;
+
+    private Coder(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      this.windowCoder = windowCoder;
+      this.keyCoder = keyCoder;
+      this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder);
     }
 
     @Override
     public void encode(Timer<T> timer, OutputStream outStream) throws CoderException, IOException {
-      InstantCoder.of().encode(timer.getTimestamp(), outStream);
-      payloadCoder.encode(timer.getPayload(), outStream);
+      keyCoder.encode(timer.getUserKey(), outStream);
+      StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+      BooleanCoder.of().encode(timer.getClearBit(), outStream);
+      if (!timer.getClearBit()) {
+        InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+        InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+        windowsCoder.encode(timer.getWindows(), outStream);
+        PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+      }
 
 Review comment:
   This was missed during the doc review but the timer will always need to encode the windowing information otherwise we won't know what windows the timer is in to clear.
   
   We'll need to update the documentation for this as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r401851561
 
 

 ##########
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##########
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
 
 Review comment:
   Yes it should be as long as we agree on that the tag should be `string`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402509304
 
 

 ##########
 File path: sdks/python/apache_beam/coders/coders_test_common.py
 ##########
 @@ -80,8 +81,9 @@ def tearDownClass(cls):
         coders.RunnerAPICoderHolder,
         coders.ToBytesCoder
     ])
-    assert not standard - cls.seen, standard - cls.seen
-    assert not standard - cls.seen_nested, standard - cls.seen_nested
+    cls.seen_nested -= set([coders.ProtoCoder, CustomCoder])
+    assert not standard - cls.seen
+    assert not cls.seen_nested - standard
 
 Review comment:
   The major change here is `assert not cls.seen_nested - standard`.  In my case I found that there is a `TimerCoder` in `standard` set, which is expected, but not in `cls.seen_nested`. I checked the code  https://github.com/apache/beam/blob/b3e06126405105836f9ae15a15d5e1c7189b7f6e/sdks/python/apache_beam/coders/coders_test_common.py#L93, only when the input coder is a `TupleCoder`, there are new coders added into `cls.seen_nested`, which means, `cls.seen_nested` should be a subset of `standard`, correct? @robertwb What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r401005415
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##########
 @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform<?, ?> pTransform) {
                     ((KvCoder) mainInput.getCoder()).getKeyCoder(),
                     // TODO: Add support for timer payloads to the SDK
                     // We currently assume that all payloads are unspecified.
-                    Timer.Coder.of(VoidCoder.of())));
+                    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)));
 
 Review comment:
   sg

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402687712
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -21,17 +21,29 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CollectionCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.joda.time.Instant;
 
 /**
- * A timer consists of a timestamp and a corresponding user supplied payload.
+ * A timer consists of a user key, a dynamic timer tag and either a bit that says that this timer
+ * should be cleared or data representing the firing timestamp, hold timestamp and a list of windows
+ * and pane information that should be used when producing output.
 
 Review comment:
   ```suggestion
    * A timer consists of a user key, a dynamic timer tag, a set of windows and either a bit that says that this timer
    * should be cleared or data representing the firing timestamp, hold timestamp and pane information that should be used when producing output.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402417649
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the clearBit is true.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the windows which is associated with the timer. This field is nullable only when the
+   * clearBit is true.
+   */
+  @Nullable
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * clearBit is true.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    } else {
+      Timer<?> that = (Timer<?>) other;
+
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && this.getFireTimestamp().equals(that.getFireTimestamp())
+          && this.getHoldTimestamp().equals(that.getHoldTimestamp())
 
 Review comment:
   fire timestamp and hold timestamp can be null when the clear is set.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402430514
 
 

 ##########
 File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
 ##########
 @@ -434,8 +452,16 @@ private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object
       assertFalse(expectedValueIterator.hasNext());
 
     } else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
-      assertEquals(((Timer) expectedValue).getTimestamp(), ((Timer) actualValue).getTimestamp());
-      assertThat(((Timer) expectedValue).getPayload(), equalTo(((Timer) actualValue).getPayload()));
+      assertEquals(((Timer) expectedValue).getUserKey(), ((Timer) actualValue).getUserKey());
+      assertEquals(
 
 Review comment:
   check that `clear` is equal

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-608241577
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402688769
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    if (this.getClearBit()) {
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && Objects.equals(this.getWindows(), that.getWindows());
+    }
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+        && (this.getClearBit() == that.getClearBit())
+        && this.getFireTimestamp().equals(that.getFireTimestamp())
+        && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
 
 Review comment:
   This will make the method less error prone in case the contract changes.
   ```suggestion
       return Objects.equals(this.getUserKey(), that.getUserKey())
           && Objects.equals(this.getDynamicTimerTag(), that.getDynamicTimerTag())
           && Objects.equals(this.getWindows(), that.getWindows())
           && (this.getClearBit() == that.getClearBit())
           && Objects.equals(this.getFireTimestamp(), that.getFireTimestamp())
           && Objects.equals(this.getHoldTimestamp(), that.getHoldTimestamp())
           && Objects.equals(this.getPane(), that.getPane());
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402418547
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
 
 Review comment:
   ```suggestion
      * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
      * the timer is being cleared.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402428893
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the clearBit is true.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the windows which is associated with the timer. This field is nullable only when the
+   * clearBit is true.
 
 Review comment:
   ```suggestion
      * Returns the windows which is associated with the timer.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591519
 
 

 ##########
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##########
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
     self._timestamp_coder_impl = TimestampCoderImpl()
-    self._payload_coder_impl = payload_coder_impl
+    self._boolean_coder_impl = BooleanCoderImpl()
+    self._pane_info_coder_impl = PaneInfoCoderImpl()
+    self._key_coder_impl = key_coder_impl
+    self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+    self._tag_coder_impl = tag_coder_impl
 
   def encode_to_stream(self, value, out, nested):
     # type: (dict, create_OutputStream, bool) -> None
-    self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
-    self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+    self._key_coder_impl.encode_to_stream(value.user_key, out, nested)
+    self._tag_coder_impl.encode_to_stream(
+        value.dynamic_timer_tag, out, True)
+    self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True)
+    if not value.clear_bit:
+      self._timestamp_coder_impl.encode_to_stream(
+          value.fire_timestamp, out, True)
+      self._timestamp_coder_impl.encode_to_stream(
+          value.hold_timestamp, out, True)
+      self._windows_coder_impl.encode_to_stream(value.windows, out, True)
+      self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True)
 
 Review comment:
   ```suggestion
         self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, nested)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399587616
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##########
 @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform<?, ?> pTransform) {
                     ((KvCoder) mainInput.getCoder()).getKeyCoder(),
                     // TODO: Add support for timer payloads to the SDK
                     // We currently assume that all payloads are unspecified.
 
 Review comment:
   ```suggestion
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on issue #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
robertwb commented on issue #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-606244698
 
 
   The proto changes look fine. To not overlap work, I will let Luke take over this PR, but if there is anything specific (or a subset of files) that you want my eyes on please don't hesitate to ping me. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-604814424
 
 
   Most python and Java SDK part has been done. Remaining work for java runner hookup.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding to V2

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#issuecomment-602876259
 
 
   I'll update the coder implementation once the proto looks good.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402432966
 
 

 ##########
 File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
 ##########
 @@ -36,39 +38,121 @@
   private static final Instant INSTANT = Instant.now();
 
   @Test
-  public void testTimer() {
-    Timer<Void> timerA = Timer.of(INSTANT);
-    assertEquals(INSTANT, timerA.getTimestamp());
-    assertNull(timerA.getPayload());
+  public void testClearTimer() {
+    Timer<String> clearedTimer = Timer.cleared("timer", "tag");
+    assertTrue(clearedTimer.getClearBit());
+    assertEquals("timer", clearedTimer.getUserKey());
+    assertEquals("tag", clearedTimer.getDynamicTimerTag());
+  }
 
-    Timer<String> timerB = Timer.of(INSTANT, "ABC");
-    assertEquals(INSTANT, timerB.getTimestamp());
-    assertEquals("ABC", timerB.getPayload());
+  @Test
+  public void testTimer() {
+    Timer<String> timer =
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+    assertEquals("key", timer.getUserKey());
+    assertEquals("tag", timer.getDynamicTimerTag());
+    assertEquals(INSTANT, timer.getFireTimestamp());
+    assertEquals(INSTANT, timer.getHoldTimestamp());
+    assertEquals(Collections.singleton(GlobalWindow.INSTANCE), timer.getWindows());
+    assertEquals(PaneInfo.NO_FIRING, timer.getPane());
+    assertFalse(timer.getClearBit());
   }
 
   @Test
-  public void testTimerCoderWithInconsistentWithEqualsPayloadCoder() throws Exception {
-    Coder<Timer<byte[]>> coder = Timer.Coder.of(ByteArrayCoder.of());
+  public void testTimerCoderWithInconsistentWithEqualsComponentCoders() throws Exception {
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
     CoderProperties.coderSerializable(coder);
     CoderProperties.structuralValueDecodeEncodeEqual(
 
 Review comment:
   Add a cleared timer for this case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402689030
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    if (this.getClearBit()) {
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && Objects.equals(this.getWindows(), that.getWindows());
+    }
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+        && (this.getClearBit() == that.getClearBit())
+        && this.getFireTimestamp().equals(that.getFireTimestamp())
+        && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    if (getClearBit()) {
+      return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows());
+    }
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
 
 Review comment:
   ```suggestion
       return Objects.hash(
           getUserKey(),
           getDynamicTimerTag(),
           getWindows(),
           getClearBit(),
           getFireTimestamp().getMillis(),
           getHoldTimestamp().getMillis(),
           getPane());
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402687822
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
 
 Review comment:
   ```suggestion
      * Returns the tag that the timer is set on. The tag is {@code ""} when the timer is for a {@link
      * TimerSpec}.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402431150
 
 

 ##########
 File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
 ##########
 @@ -36,39 +38,121 @@
   private static final Instant INSTANT = Instant.now();
 
   @Test
-  public void testTimer() {
-    Timer<Void> timerA = Timer.of(INSTANT);
-    assertEquals(INSTANT, timerA.getTimestamp());
-    assertNull(timerA.getPayload());
+  public void testClearTimer() {
+    Timer<String> clearedTimer = Timer.cleared("timer", "tag");
+    assertTrue(clearedTimer.getClearBit());
+    assertEquals("timer", clearedTimer.getUserKey());
+    assertEquals("tag", clearedTimer.getDynamicTimerTag());
+  }
 
-    Timer<String> timerB = Timer.of(INSTANT, "ABC");
-    assertEquals(INSTANT, timerB.getTimestamp());
-    assertEquals("ABC", timerB.getPayload());
+  @Test
+  public void testTimer() {
+    Timer<String> timer =
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
 
 Review comment:
   use two different instant objects otherwise you can't tell if they get swapped accidentally

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#discussion_r397213834
 
 

 ##########
 File path: model/pipeline/src/main/proto/beam_runner_api.proto
 ##########
 @@ -703,13 +703,21 @@ message StandardCoders {
     // Components: Coder for a single element.
     ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];
 
-    // Encodes a timer containing a timestamp and a user specified payload.
-    // The encoding is represented as: timestamp payload
-    //   timestamp - a big endian 8 byte integer representing millis-since-epoch.
-    //     The encoded representation is shifted so that the byte representation of
-    //     negative values are lexicographically ordered before the byte representation
-    //     of positive values. This is typically done by subtracting -9223372036854775808
-    //     from the value and encoding it as a signed big endian integer. Example values:
+    // Encodes a timer containing a user key, a clear bit,
+    // a fire timestamp, a hold timestamp, the windows that the user key is in,
+    // the paneinfo of the user key.
+    // The encoding is represented as:
+    //   user key - user defined key, uses the component coder.
+    //
 
 Review comment:
   The *dynamic tag* is supposed to be part of the encoding.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402481203
 
 

 ##########
 File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
 ##########
 @@ -28,6 +28,23 @@
 #             one of a few standard JSON types such as numbers, strings, dicts that map naturally
 #             to the type encoded by the coder.
 #
+# Java code snippet to generate example bytes:
+#   Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+#   Instant now = new Instant(1000L);
+#   Timer<String> timer = Timer.of(
+#     "key",
+#     "tag",
+#     now,
+#     now,
+#     Collections.singletonList(GlobalWindow.INSTANCE),
+#     PaneInfo.NO_FIRING);
+#   byte[] byets = CoderUtils.encodeToByteArray(coder, timer);
+#   String str = new String(byets, java.nio.charset.StandardCharsets.ISO_8859_1);
 
 Review comment:
   Kind of sure since it works for me. And I found usage here: https://github.com/apache/beam/blob/03c8544227530eb01dd0c64335e404ea387ee3a8/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java#L190

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402713697
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    if (this.getClearBit()) {
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && Objects.equals(this.getWindows(), that.getWindows());
+    }
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+        && (this.getClearBit() == that.getClearBit())
+        && this.getFireTimestamp().equals(that.getFireTimestamp())
+        && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    if (getClearBit()) {
+      return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows());
+    }
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
 
 Review comment:
   `getFireTimestamp()` returns `null` when `clearBit` is `true`. I think we still need to check  `clearBit` in order to hash the `timestamp` with milliseconds.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588665
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -57,11 +78,24 @@
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract T getUserKey();
+
+  public abstract String getDynamicTimerTag();
+
+  public abstract Boolean getClearBit();
 
 Review comment:
   ```suggestion
     public abstract boolean getClearBit();
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402416594
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
 
 Review comment:
   ```suggestion
      * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link TimerSpec}.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402410797
 
 

 ##########
 File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
 ##########
 @@ -207,15 +224,19 @@ examples:
 
 coder:
   urn: "beam:coder:timer:v1"
-  components: [{urn: "beam:coder:bytes:v1"}]
+  components: [{urn: "beam:coder:string_utf8:v1"},
+               {urn: "beam:coder:global_window:v1"}]
 examples:
-  "\0\0\0\0\0\0\0\0\u0003abc": {timestamp: -9223372036854775808, payload: abc}
-  "\x7fÿÿÿÿÿÿ\x01\u0003abc": {timestamp: -255, payload: abc}
-  "\x7fÿÿÿÿÿÿÿ\u0003abc": {timestamp: -1, payload: abc}
-  "\x80\0\0\0\0\0\0\0\u0003abc": {timestamp: 0, payload: abc}
-  "\x80\0\0\0\0\0\0\x01\u0003abc": {timestamp: 1, payload: abc}
-  "\x80\0\0\0\0\0\x01\0\u0003abc": {timestamp: 256, payload: abc}
-  "ÿÿÿÿÿÿÿÿ\u0003abc": {timestamp: 9223372036854775807, payload: abc}
+  "\u0003\u006b\u0065\u0079\u0003\u0074\u0061\u0067\u0000\u0080\u0000\u0000\u0000\u0000\u0000\u0003\u00e8\u0080\u0000\u0000\u0000\u0000\u0000\u0003\u00e8\u0000\u0000\u0000\u0001\u000f": {
 
 Review comment:
   Please add a test for a cleared timer encoding.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402692551
 
 

 ##########
 File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
 ##########
 @@ -33,42 +35,128 @@
 /** Tests for {@link Timer}. */
 @RunWith(JUnit4.class)
 public class TimerTest {
-  private static final Instant INSTANT = Instant.now();
+  private static final Instant FIREINSTANT = new Instant(123L);
+  private static final Instant HOLDINSTANT = new Instant(456L);
 
 Review comment:
   ```suggestion
     private static final Instant FIRE_TIME = new Instant(123L);
     private static final Instant HOLD_TIME = new Instant(456L);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402438661
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##########
 @@ -580,16 +580,36 @@ def __init__(self,
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
+    # TODO(c): Plumb through actual timer fields.
 
 Review comment:
   ```suggestion
       # TODO(BEAM-9562): Plumb through actual timer fields.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402505215
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
 
 Review comment:
   The default should be the input timestamp but we should allow people to set it to any value greater then that. For example, consider a person writing a simple polling function where they check the status of something using a timer and they would like to be able to advance the hold based upon what the current system time is. The input to this transform is the output of impulse (+ a dummy key) so the input timestamp would be MIN_TIMESTAMP initially.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402495319
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
 
 Review comment:
   Agree on always using absolute timestamp here. 
   
   One question about computing hold timestamp. Why do we need an  optional positive offset besides input timestamp? I thought the watermark hold should be set as the input element timestamp.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#discussion_r397351447
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -609,16 +609,20 @@ message Elements {
     // represents the producer of these timers.
     string transform_id = 2;
 
+    // (Optional) The local timer family name used to identify the associated
+    // timer family specification
+    string timer_family_tag = 3;
+
     // (Optional) The local timer name used to identify the associated timer specification.
-    string timer_id = 3;
+    string timer_id = 4;
 
     // (Optional) Represents a logical byte stream of a timer. Encoded according
     // to the coder in the timer spec.
-    bytes timer = 4;
+    bytes timer = 5;
 
 Review comment:
   The `Elements` has repeated field of `Timer`. So I think the `Timer` represents a single timer,  and `Element` represent a full stream of timers for one bundle.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402692210
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
 
 Review comment:
   ```suggestion
     /** Returns the windows which are associated with the timer. */
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#discussion_r397328635
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -609,16 +609,20 @@ message Elements {
     // represents the producer of these timers.
     string transform_id = 2;
 
+    // (Optional) The local timer family name used to identify the associated
+    // timer family specification
+    string timer_family_tag = 3;
 
 Review comment:
   We should agree on terminology. The "family" represents the set of timers, within that one has multiple [dynamic] tags. I would call this `timer_family_id`. 
   
   We should not have both `timer_specs` and `timer_family_specs`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r403099609
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    if (this.getClearBit()) {
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && Objects.equals(this.getWindows(), that.getWindows());
+    }
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+        && (this.getClearBit() == that.getClearBit())
+        && this.getFireTimestamp().equals(that.getFireTimestamp())
+        && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    if (getClearBit()) {
+      return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows());
+    }
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
 
 Review comment:
   good point

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399589246
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -73,59 +107,81 @@
    */
   public static class Coder<T> extends StructuredCoder<Timer<T>> {
 
-    public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      return new Coder(payloadCoder);
+    public static <T> Coder<T> of(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      return new Coder<>(keyCoder, windowCoder);
     }
 
-    private final org.apache.beam.sdk.coders.Coder<T> payloadCoder;
+    private final org.apache.beam.sdk.coders.Coder<T> keyCoder;
+    private final org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>>
+        windowsCoder;
 
-    private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      this.payloadCoder = payloadCoder;
+    private Coder(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      this.keyCoder = keyCoder;
+      this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder);
+      ;
     }
 
     @Override
     public void encode(Timer<T> timer, OutputStream outStream) throws CoderException, IOException {
-      InstantCoder.of().encode(timer.getTimestamp(), outStream);
-      payloadCoder.encode(timer.getPayload(), outStream);
+      keyCoder.encode(timer.getUserKey(), outStream);
+      StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+      BooleanCoder.of().encode(timer.getClearBit(), outStream);
+      if (!timer.getClearBit()) {
+        InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+        InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+        windowsCoder.encode(timer.getWindows(), outStream);
+        PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+      }
     }
 
     @Override
     public Timer<T> decode(InputStream inStream) throws CoderException, IOException {
-      Instant instant = InstantCoder.of().decode(inStream);
-      T value = payloadCoder.decode(inStream);
-      return Timer.of(instant, value);
+      T userKey = keyCoder.decode(inStream);
+      String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+      Boolean clearBit = BooleanCoder.of().decode(inStream);
 
 Review comment:
   ```suggestion
         boolean clearBit = BooleanCoder.of().decode(inStream);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588611
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,14 +49,26 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a timer for the given timestamp with a user specified payload.
+   *
+   * @return
+   */
+  // TODO(BEAM-9562): Plumb through actual Timer fields.
+  public static <T> Timer<T> of(
 
 Review comment:
   May I suggest you create these two static methods for creating the two common cases?
   .of(userKey, dynamicTimerTag, fireTimestamp, holdTimestamp, windows, pane);
   .cleared(userKey, dynamicTImerTag);
   
   this way you can make sure that dynamicTimerTag, fireTimestamp, holdTimestamp, windows and pane are not null

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402433871
 
 

 ##########
 File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
 ##########
 @@ -36,39 +38,121 @@
   private static final Instant INSTANT = Instant.now();
 
   @Test
-  public void testTimer() {
-    Timer<Void> timerA = Timer.of(INSTANT);
-    assertEquals(INSTANT, timerA.getTimestamp());
-    assertNull(timerA.getPayload());
+  public void testClearTimer() {
+    Timer<String> clearedTimer = Timer.cleared("timer", "tag");
+    assertTrue(clearedTimer.getClearBit());
+    assertEquals("timer", clearedTimer.getUserKey());
+    assertEquals("tag", clearedTimer.getDynamicTimerTag());
+  }
 
-    Timer<String> timerB = Timer.of(INSTANT, "ABC");
-    assertEquals(INSTANT, timerB.getTimestamp());
-    assertEquals("ABC", timerB.getPayload());
+  @Test
+  public void testTimer() {
+    Timer<String> timer =
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+    assertEquals("key", timer.getUserKey());
+    assertEquals("tag", timer.getDynamicTimerTag());
+    assertEquals(INSTANT, timer.getFireTimestamp());
+    assertEquals(INSTANT, timer.getHoldTimestamp());
+    assertEquals(Collections.singleton(GlobalWindow.INSTANCE), timer.getWindows());
+    assertEquals(PaneInfo.NO_FIRING, timer.getPane());
+    assertFalse(timer.getClearBit());
   }
 
   @Test
-  public void testTimerCoderWithInconsistentWithEqualsPayloadCoder() throws Exception {
-    Coder<Timer<byte[]>> coder = Timer.Coder.of(ByteArrayCoder.of());
+  public void testTimerCoderWithInconsistentWithEqualsComponentCoders() throws Exception {
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
     CoderProperties.coderSerializable(coder);
     CoderProperties.structuralValueDecodeEncodeEqual(
-        coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING));
     CoderProperties.structuralValueConsistentWithEquals(
-        coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)), Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING),
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING));
   }
 
   @Test
-  public void testTimerCoderWithConsistentWithEqualsPayloadCoder() throws Exception {
-    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of());
-    CoderProperties.coderDecodeEncodeEqual(coder, Timer.of(INSTANT, "ABC"));
+  public void testTimerCoderWithConsistentWithEqualsComponentCoders() throws Exception {
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+    CoderProperties.coderDecodeEncodeEqual(
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING));
     CoderProperties.coderConsistentWithEquals(
-        coder, Timer.of(INSTANT, "ABC"), Timer.of(INSTANT, "ABC"));
-    CoderProperties.coderDeterministic(coder, Timer.of(INSTANT, "ABC"), Timer.of(INSTANT, "ABC"));
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING),
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING));
+    CoderProperties.coderDeterministic(
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING),
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING));
   }
 
   @Test
   public void testTimerCoderWireFormat() throws Exception {
-    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of());
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
     CoderProperties.coderEncodesBase64(
-        coder, Timer.of(new Instant(255L), "ABC"), "gAAAAAAAAP8DQUJD");
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            new Instant(225L),
+            new Instant(225L),
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING),
+        "A2tleQN0YWcAgAAAAAAAAOGAAAAAAAAA4QAAAAEP");
   }
 
 Review comment:
   We can get rid of this test since standard_coders.yaml covers it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399740242
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##########
 @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform<?, ?> pTransform) {
                     ((KvCoder) mainInput.getCoder()).getKeyCoder(),
                     // TODO: Add support for timer payloads to the SDK
                     // We currently assume that all payloads are unspecified.
-                    Timer.Coder.of(VoidCoder.of())));
+                    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)));
 
 Review comment:
   I chose `StringUtf8Coder` for convenience since there is no place actually parsing this key. I plan to plump through actual coders when migrating to use data channel. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz merged pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-608229553
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402425415
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
 
 Review comment:
   CC: @robertwb @reuvenlax 
   
   Originally I was thinking that timers should always be relative but now that I think about it more I think we should make this field absolute. When timers are created during normal element processing within an SDK then watermark timer should be computed based upon the input elements timestamp and processing time timers can be computed using the system's clock. For timers created during timer callbacks, the timers should always be relative to the fire timestamp of that timer.
   
   Hold timestamps by default should be the input elements timestamp plus an optional positive offset during normal element processing and they should be the hold timestamp of the fired timer plus an optional positive offset during timer callbacks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399589315
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -73,59 +107,81 @@
    */
   public static class Coder<T> extends StructuredCoder<Timer<T>> {
 
-    public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      return new Coder(payloadCoder);
+    public static <T> Coder<T> of(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      return new Coder<>(keyCoder, windowCoder);
     }
 
-    private final org.apache.beam.sdk.coders.Coder<T> payloadCoder;
+    private final org.apache.beam.sdk.coders.Coder<T> keyCoder;
+    private final org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>>
+        windowsCoder;
 
-    private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      this.payloadCoder = payloadCoder;
+    private Coder(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      this.keyCoder = keyCoder;
+      this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder);
+      ;
     }
 
     @Override
     public void encode(Timer<T> timer, OutputStream outStream) throws CoderException, IOException {
-      InstantCoder.of().encode(timer.getTimestamp(), outStream);
-      payloadCoder.encode(timer.getPayload(), outStream);
+      keyCoder.encode(timer.getUserKey(), outStream);
+      StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+      BooleanCoder.of().encode(timer.getClearBit(), outStream);
+      if (!timer.getClearBit()) {
+        InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+        InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+        windowsCoder.encode(timer.getWindows(), outStream);
+        PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+      }
     }
 
     @Override
     public Timer<T> decode(InputStream inStream) throws CoderException, IOException {
-      Instant instant = InstantCoder.of().decode(inStream);
-      T value = payloadCoder.decode(inStream);
-      return Timer.of(instant, value);
+      T userKey = keyCoder.decode(inStream);
+      String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+      Boolean clearBit = BooleanCoder.of().decode(inStream);
+      if (clearBit) {
+        return Timer.of(userKey, dynamicTimerTag, clearBit);
+      }
+      Instant fireTimestamp = InstantCoder.of().decode(inStream);
+      Instant holeTimestamp = InstantCoder.of().decode(inStream);
 
 Review comment:
   ```suggestion
         Instant holdTimestamp = InstantCoder.of().decode(inStream);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399587568
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##########
 @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform<?, ?> pTransform) {
                     ((KvCoder) mainInput.getCoder()).getKeyCoder(),
                     // TODO: Add support for timer payloads to the SDK
                     // We currently assume that all payloads are unspecified.
-                    Timer.Coder.of(VoidCoder.of())));
+                    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)));
 
 Review comment:
   ```suggestion
                       Timer.Coder.of(((KvCoder) mainInput.getCoder()).getKeyCoder(), GlobalWindow.Coder.INSTANCE)));
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402429043
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the clearBit is true.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the windows which is associated with the timer. This field is nullable only when the
+   * clearBit is true.
+   */
+  @Nullable
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * clearBit is true.
 
 Review comment:
   ```suggestion
      * Returns the paneinfo that is related to the timer. This field is nullable only when the
      * timer is being cleared.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402411530
 
 

 ##########
 File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
 ##########
 @@ -28,6 +28,23 @@
 #             one of a few standard JSON types such as numbers, strings, dicts that map naturally
 #             to the type encoded by the coder.
 #
+# Java code snippet to generate example bytes:
+#   Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+#   Instant now = new Instant(1000L);
+#   Timer<String> timer = Timer.of(
+#     "key",
+#     "tag",
+#     now,
+#     now,
+#     Collections.singletonList(GlobalWindow.INSTANCE),
+#     PaneInfo.NO_FIRING);
+#   byte[] byets = CoderUtils.encodeToByteArray(coder, timer);
+#   String str = new String(byets, java.nio.charset.StandardCharsets.ISO_8859_1);
 
 Review comment:
   You sure the charset needs to be ISO_8859_1?
   
   ```suggestion
   #   byte[] bytes = CoderUtils.encodeToByteArray(coder, timer);
   #   String str = new String(bytes, java.nio.charset.StandardCharsets.ISO_8859_1);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402685419
 
 

 ##########
 File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
 ##########
 @@ -28,6 +28,23 @@
 #             one of a few standard JSON types such as numbers, strings, dicts that map naturally
 #             to the type encoded by the coder.
 #
+# Java code snippet to generate example bytes:
+#   Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+#   Instant now = new Instant(1000L);
+#   Timer<String> timer = Timer.of(
+#     "key",
+#     "tag",
+#     now,
+#     now,
+#     Collections.singletonList(GlobalWindow.INSTANCE),
+#     PaneInfo.NO_FIRING);
+#   byte[] byets = CoderUtils.encodeToByteArray(coder, timer);
+#   String str = new String(byets, java.nio.charset.StandardCharsets.ISO_8859_1);
 
 Review comment:
   sg

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402687607
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
 
 Review comment:
   ```suggestion
     /** Returns a cleared timer for the given {@code userKey}, {@code dynamicTimerTag} and {@code windows}. */
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-607919209
 
 
   Run JavaPortabilityApi PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#discussion_r397214242
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -609,16 +609,20 @@ message Elements {
     // represents the producer of these timers.
     string transform_id = 2;
 
+    // (Optional) The local timer family name used to identify the associated
+    // timer family specification
+    string timer_family_tag = 3;
 
 Review comment:
   `timer_id` == `timer_family_tag` since the `timer_id` represents the `key` from `timer_family_specs`.
   
   Note that we will either need to remove `timer_specs` from ParDoPayload or ensure that `timer_specs` and `timer_family_specs` don't share any keys.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-608539461
 
 
   All tests passed. i'm going to merge it. Thanks, everyone!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402413982
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -21,17 +21,27 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CollectionCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.joda.time.Instant;
 
 /**
- * A timer consists of a timestamp and a corresponding user supplied payload.
+ * A timer consists of a userKey, a dynamicTimerTag, a clearBit, a fireTimestamp, a holdTimestamp a
+ * list of window and pane.
 
 Review comment:
   ```suggestion
    * A timer consists of a user key, a dynamic timer tag and either a bit that says that this timer should be cleared or data representing the firing timestamp, hold timestamp and a list of windows and pane information that should be used when producing output.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399589633
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -73,59 +107,81 @@
    */
   public static class Coder<T> extends StructuredCoder<Timer<T>> {
 
-    public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      return new Coder(payloadCoder);
+    public static <T> Coder<T> of(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      return new Coder<>(keyCoder, windowCoder);
     }
 
-    private final org.apache.beam.sdk.coders.Coder<T> payloadCoder;
+    private final org.apache.beam.sdk.coders.Coder<T> keyCoder;
+    private final org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>>
+        windowsCoder;
 
-    private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      this.payloadCoder = payloadCoder;
+    private Coder(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      this.keyCoder = keyCoder;
+      this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder);
+      ;
     }
 
     @Override
     public void encode(Timer<T> timer, OutputStream outStream) throws CoderException, IOException {
-      InstantCoder.of().encode(timer.getTimestamp(), outStream);
-      payloadCoder.encode(timer.getPayload(), outStream);
+      keyCoder.encode(timer.getUserKey(), outStream);
+      StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+      BooleanCoder.of().encode(timer.getClearBit(), outStream);
+      if (!timer.getClearBit()) {
+        InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+        InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+        windowsCoder.encode(timer.getWindows(), outStream);
+        PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+      }
     }
 
     @Override
     public Timer<T> decode(InputStream inStream) throws CoderException, IOException {
-      Instant instant = InstantCoder.of().decode(inStream);
-      T value = payloadCoder.decode(inStream);
-      return Timer.of(instant, value);
+      T userKey = keyCoder.decode(inStream);
+      String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+      Boolean clearBit = BooleanCoder.of().decode(inStream);
+      if (clearBit) {
+        return Timer.of(userKey, dynamicTimerTag, clearBit);
+      }
+      Instant fireTimestamp = InstantCoder.of().decode(inStream);
+      Instant holeTimestamp = InstantCoder.of().decode(inStream);
+      Collection<? extends BoundedWindow> windows = windowsCoder.decode(inStream);
+      PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
+      return Timer.of(
+          userKey, dynamicTimerTag, clearBit, fireTimestamp, holeTimestamp, windows, pane);
     }
 
     @Override
     public List<? extends org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
-      return Collections.singletonList(payloadCoder);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic(this, "Payload coder must be deterministic", payloadCoder);
+      return Collections.singletonList(keyCoder);
     }
 
     @Override
-    public boolean consistentWithEquals() {
-      return payloadCoder.consistentWithEquals();
+    public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() {
+      return Arrays.asList(keyCoder, windowsCoder);
     }
 
     @Override
-    public Object structuralValue(Timer<T> value) {
-      return Timer.of(value.getTimestamp(), payloadCoder.structuralValue(value.getPayload()));
-    }
-
-    @Override
-    public boolean isRegisterByteSizeObserverCheap(Timer<T> value) {
-      return payloadCoder.isRegisterByteSizeObserverCheap(value.getPayload());
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(this, "UserKey coder must be deterministic", keyCoder);
+      verifyDeterministic(this, "Windows coder must be deterministic", windowsCoder);
 
 Review comment:
   ```suggestion
         verifyDeterministic(this, "Window coder must be deterministic", windowsCoder);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402428722
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +50,197 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given userKey, dynamicTimerTag, fireTimestamp,
+   * holdTimestamp, windows and pane.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, false, fireTimestamp, holdTimestamp, windows, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag. */
+  public static <T> Timer<T> cleared(T userKey, String dynamicTimerTag) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, true, null, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is empty string only when the timer is set
+   * from TimerSpec.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns a boolean which indicate whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the clearBit is true.
    *
    * <p>The time is relative to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the clearBit is true.
 
 Review comment:
   ```suggestion
      * Returns the watermark that the timer is supposed to be held. This field is nullable only when
      * the timer is being cleared.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#discussion_r397324906
 
 

 ##########
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##########
 @@ -609,16 +609,20 @@ message Elements {
     // represents the producer of these timers.
     string transform_id = 2;
 
+    // (Optional) The local timer family name used to identify the associated
+    // timer family specification
+    string timer_family_tag = 3;
+
     // (Optional) The local timer name used to identify the associated timer specification.
-    string timer_id = 3;
+    string timer_id = 4;
 
     // (Optional) Represents a logical byte stream of a timer. Encoded according
     // to the coder in the timer spec.
-    bytes timer = 4;
+    bytes timer = 5;
 
 Review comment:
   This should be `timers` to represent a full stream of timers.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402437731
 
 

 ##########
 File path: sdks/python/apache_beam/coders/coders_test_common.py
 ##########
 @@ -80,8 +81,9 @@ def tearDownClass(cls):
         coders.RunnerAPICoderHolder,
         coders.ToBytesCoder
     ])
-    assert not standard - cls.seen, standard - cls.seen
-    assert not standard - cls.seen_nested, standard - cls.seen_nested
+    cls.seen_nested -= set([coders.ProtoCoder, CustomCoder])
+    assert not standard - cls.seen
+    assert not cls.seen_nested - standard
 
 Review comment:
   Why is this change necessary?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591742
 
 

 ##########
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##########
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
     self._timestamp_coder_impl = TimestampCoderImpl()
-    self._payload_coder_impl = payload_coder_impl
+    self._boolean_coder_impl = BooleanCoderImpl()
+    self._pane_info_coder_impl = PaneInfoCoderImpl()
+    self._key_coder_impl = key_coder_impl
+    self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+    self._tag_coder_impl = tag_coder_impl
 
   def encode_to_stream(self, value, out, nested):
     # type: (dict, create_OutputStream, bool) -> None
-    self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
-    self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+    self._key_coder_impl.encode_to_stream(value.user_key, out, nested)
+    self._tag_coder_impl.encode_to_stream(
+        value.dynamic_timer_tag, out, True)
+    self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True)
+    if not value.clear_bit:
+      self._timestamp_coder_impl.encode_to_stream(
+          value.fire_timestamp, out, True)
+      self._timestamp_coder_impl.encode_to_stream(
+          value.hold_timestamp, out, True)
+      self._windows_coder_impl.encode_to_stream(value.windows, out, True)
+      self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True)
 
   def decode_from_stream(self, in_stream, nested):
     # type: (create_InputStream, bool) -> dict
     # TODO(robertwb): Consider using a concrete class rather than a dict here.
-    return dict(
-        timestamp=self._timestamp_coder_impl.decode_from_stream(
+    from apache_beam.transforms import userstate
+    user_key = self._key_coder_impl.decode_from_stream(in_stream, nested)
 
 Review comment:
   ```suggestion
       user_key = self._key_coder_impl.decode_from_stream(in_stream, True)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402688977
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    if (this.getClearBit()) {
+      return Objects.equals(this.getUserKey(), that.getUserKey())
+          && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+          && (this.getClearBit() == that.getClearBit())
+          && Objects.equals(this.getWindows(), that.getWindows());
+    }
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+        && (this.getClearBit() == that.getClearBit())
+        && this.getFireTimestamp().equals(that.getFireTimestamp())
+        && this.getHoldTimestamp().equals(that.getHoldTimestamp())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    if (getClearBit()) {
+      return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows());
+    }
 
 Review comment:
   The hash is still stable will null objects.
   ```suggestion
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402433137
 
 

 ##########
 File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
 ##########
 @@ -36,39 +38,121 @@
   private static final Instant INSTANT = Instant.now();
 
   @Test
-  public void testTimer() {
-    Timer<Void> timerA = Timer.of(INSTANT);
-    assertEquals(INSTANT, timerA.getTimestamp());
-    assertNull(timerA.getPayload());
+  public void testClearTimer() {
+    Timer<String> clearedTimer = Timer.cleared("timer", "tag");
+    assertTrue(clearedTimer.getClearBit());
+    assertEquals("timer", clearedTimer.getUserKey());
+    assertEquals("tag", clearedTimer.getDynamicTimerTag());
+  }
 
-    Timer<String> timerB = Timer.of(INSTANT, "ABC");
-    assertEquals(INSTANT, timerB.getTimestamp());
-    assertEquals("ABC", timerB.getPayload());
+  @Test
+  public void testTimer() {
+    Timer<String> timer =
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+    assertEquals("key", timer.getUserKey());
+    assertEquals("tag", timer.getDynamicTimerTag());
+    assertEquals(INSTANT, timer.getFireTimestamp());
+    assertEquals(INSTANT, timer.getHoldTimestamp());
+    assertEquals(Collections.singleton(GlobalWindow.INSTANCE), timer.getWindows());
+    assertEquals(PaneInfo.NO_FIRING, timer.getPane());
+    assertFalse(timer.getClearBit());
   }
 
   @Test
-  public void testTimerCoderWithInconsistentWithEqualsPayloadCoder() throws Exception {
-    Coder<Timer<byte[]>> coder = Timer.Coder.of(ByteArrayCoder.of());
+  public void testTimerCoderWithInconsistentWithEqualsComponentCoders() throws Exception {
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
     CoderProperties.coderSerializable(coder);
     CoderProperties.structuralValueDecodeEncodeEqual(
-        coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING));
     CoderProperties.structuralValueConsistentWithEquals(
-        coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)), Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING),
+        Timer.of(
+            "key",
+            "tag",
+            INSTANT,
+            INSTANT,
+            Collections.singleton(GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING));
   }
 
   @Test
-  public void testTimerCoderWithConsistentWithEqualsPayloadCoder() throws Exception {
-    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of());
-    CoderProperties.coderDecodeEncodeEqual(coder, Timer.of(INSTANT, "ABC"));
+  public void testTimerCoderWithConsistentWithEqualsComponentCoders() throws Exception {
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
 
 Review comment:
   Add a cleared timer for this case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591425
 
 

 ##########
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##########
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
 
 Review comment:
   why do you have to specify the tag_coder_impl, shouldn't it always be Python's string utf8 coder?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588782
 
 

 ##########
 File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##########
 @@ -73,59 +107,81 @@
    */
   public static class Coder<T> extends StructuredCoder<Timer<T>> {
 
 Review comment:
   please update class comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services