You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 19:00:47 UTC

[1/2] beam git commit: Use processing time for synchronized processing time in Flink runner

Repository: beam
Updated Branches:
  refs/heads/master b40b26501 -> c2c89eda9


Use processing time for synchronized processing time in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/05722acc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/05722acc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/05722acc

Branch: refs/heads/master
Commit: 05722accd3da1a92af9f73e85c7ae78742cd9db1
Parents: ae72456
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 09:03:03 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 09:03:05 2017 -0700

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        | 36 ++++++++++++--------
 1 file changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/05722acc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 54eb770..c130200 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -720,13 +720,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     @Override
     public void setTimer(TimerData timerKey) {
       long time = timerKey.getTimestamp().getMillis();
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        timerService.registerEventTimeTimer(timerKey, time);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        timerService.registerProcessingTimeTimer(timerKey, time);
-      } else {
-        throw new UnsupportedOperationException(
-            "Unsupported time domain: " + timerKey.getDomain());
+      switch (timerKey.getDomain()) {
+        case EVENT_TIME:
+          timerService.registerEventTimeTimer(timerKey, time);
+          break;
+        case PROCESSING_TIME:
+        case SYNCHRONIZED_PROCESSING_TIME:
+          timerService.registerProcessingTimeTimer(timerKey, time);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported time domain: " + timerKey.getDomain());
       }
     }
 
@@ -747,13 +751,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     @Override
     public void deleteTimer(TimerData timerKey) {
       long time = timerKey.getTimestamp().getMillis();
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        timerService.deleteEventTimeTimer(timerKey, time);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        timerService.deleteProcessingTimeTimer(timerKey, time);
-      } else {
-        throw new UnsupportedOperationException(
-            "Unsupported time domain: " + timerKey.getDomain());
+      switch (timerKey.getDomain()) {
+        case EVENT_TIME:
+          timerService.deleteEventTimeTimer(timerKey, time);
+          break;
+        case PROCESSING_TIME:
+        case SYNCHRONIZED_PROCESSING_TIME:
+          timerService.deleteProcessingTimeTimer(timerKey, time);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported time domain: " + timerKey.getDomain());
       }
     }
 


[2/2] beam git commit: This closes #2826: Use processing time for synchronized processing time in Flink runner

Posted by ke...@apache.org.
This closes #2826: Use processing time for synchronized processing time in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2c89eda
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2c89eda
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2c89eda

Branch: refs/heads/master
Commit: c2c89eda9d6110414548d7131214cfd9946b3a13
Parents: b40b265 05722ac
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 12:00:25 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 12:00:25 2017 -0700

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        | 36 ++++++++++++--------
 1 file changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2c89eda/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------