You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/20 03:50:56 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

boyuanzz commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427720451



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -962,16 +971,25 @@ private Progress getProgress() {
             .build());
   }
 
-  private <K> void processTimer(String timerId, TimeDomain timeDomain, Timer<K> timer) {
+  private <K> void processTimer(
+      String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
     currentTimer = timer;
     currentTimeDomain = timeDomain;
     onTimerContext = new OnTimerContext<>(timer.getUserKey());
+    String timerId =
+        timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
       If the `timerIdOrTimerFamilyId ` is for a timer family, should the timerId be the `timer.dynamicTimerTag`?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
     }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-    FnApiTimerMap() {}
+  private class FnApiTimerMap<K> implements TimerMap {
+    private final String timerFamilyId;
+    private final K userKey;
+    private final TimeDomain timeDomain;
+    private final Instant elementTimestampOrTimerHoldTimestamp;
+    private final Instant elementTimestampOrTimerFireTimestamp;
+    private final BoundedWindow boundedWindow;
+    private final PaneInfo paneInfo;
+
+    FnApiTimerMap(
+        String timerFamilyId,
+        K userKey,
+        BoundedWindow boundedWindow,
+        Instant elementTimestampOrTimerHoldTimestamp,
+        Instant elementTimestampOrTimerFireTimestamp,
+        PaneInfo paneInfo) {
+      this.timerFamilyId = timerFamilyId;
+      this.userKey = userKey;
+      this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
+      this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp;
+      this.boundedWindow = boundedWindow;
+      this.paneInfo = paneInfo;
+
+      TimerFamilyDeclaration timerFamilyDeclaration =
+          doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+      this.timeDomain =

Review comment:
       Similar to `FnApiTimer` above, we should have `timeDomain` from proto, right?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
       // Extract out relevant TimerFamilySpec information in preparation for execution.
       for (Map.Entry<String, TimerFamilySpec> entry :
           parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-        String timerFamilyId = entry.getKey();
-        TimeDomain timeDomain =
-            DoFnSignatures.getTimerSpecOrThrow(
-                    doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-                .getTimeDomain();
+        String timerIdOrTimerFamilyId = entry.getKey();
+        TimeDomain timeDomain;
+        if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+          timeDomain =

Review comment:
       The `TTimerFamilySpec` should have `time_domain ` field. Maybe we could  do something similar to https://github.com/apache/beam/blob/1de50c348706ed25af2bab9c9477d7d4f36ef8bf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java#L657-L668

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
       // Extract out relevant TimerFamilySpec information in preparation for execution.
       for (Map.Entry<String, TimerFamilySpec> entry :
           parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-        String timerFamilyId = entry.getKey();
-        TimeDomain timeDomain =
-            DoFnSignatures.getTimerSpecOrThrow(
-                    doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-                .getTimeDomain();
+        String timerIdOrTimerFamilyId = entry.getKey();
+        TimeDomain timeDomain;
+        if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+          timeDomain =
+              DoFnSignatures.getTimerFamilySpecOrThrow(
+                      doFnSignature.timerFamilyDeclarations().get(timerIdOrTimerFamilyId), doFn)
+                  .getTimeDomain();
+        } else {
+          timeDomain =
+              DoFnSignatures.getTimerSpecOrThrow(
+                      doFnSignature.timerDeclarations().get(timerIdOrTimerFamilyId), doFn)
+                  .getTimeDomain();
+        }
         Coder<Timer<Object>> timerCoder =
             (Coder) rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId());
-        timerFamilyInfosBuilder.put(timerFamilyId, KV.of(timeDomain, timerCoder));
+        timerFamilyInfosBuilder.put(timerIdOrTimerFamilyId, KV.of(timeDomain, timerCoder));

Review comment:
       Could you please add more javadoc/comments about why it's `timerId` or `timerFamilyId`?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
     }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-    FnApiTimerMap() {}
+  private class FnApiTimerMap<K> implements TimerMap {
+    private final String timerFamilyId;
+    private final K userKey;
+    private final TimeDomain timeDomain;
+    private final Instant elementTimestampOrTimerHoldTimestamp;
+    private final Instant elementTimestampOrTimerFireTimestamp;
+    private final BoundedWindow boundedWindow;
+    private final PaneInfo paneInfo;
+
+    FnApiTimerMap(
+        String timerFamilyId,
+        K userKey,
+        BoundedWindow boundedWindow,
+        Instant elementTimestampOrTimerHoldTimestamp,
+        Instant elementTimestampOrTimerFireTimestamp,
+        PaneInfo paneInfo) {
+      this.timerFamilyId = timerFamilyId;
+      this.userKey = userKey;
+      this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
+      this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp;
+      this.boundedWindow = boundedWindow;
+      this.paneInfo = paneInfo;
+
+      TimerFamilyDeclaration timerFamilyDeclaration =
+          doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+      this.timeDomain =
+          DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain();
+    }
 
     @Override
-    public void set(String timerId, Instant absoluteTime) {}
+    public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
       We should consider exposing more APIs here like `FnApiTimer`. Maybe a TODO here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org