You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 19:00:47 UTC
[1/2] beam git commit: Use processing time for synchronized
processing time in Flink runner
Repository: beam
Updated Branches:
refs/heads/master b40b26501 -> c2c89eda9
Use processing time for synchronized processing time in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/05722acc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/05722acc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/05722acc
Branch: refs/heads/master
Commit: 05722accd3da1a92af9f73e85c7ae78742cd9db1
Parents: ae72456
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 09:03:03 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 09:03:05 2017 -0700
----------------------------------------------------------------------
.../wrappers/streaming/DoFnOperator.java | 36 ++++++++++++--------
1 file changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/05722acc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 54eb770..c130200 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -720,13 +720,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public void setTimer(TimerData timerKey) {
long time = timerKey.getTimestamp().getMillis();
- if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
- timerService.registerEventTimeTimer(timerKey, time);
- } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
- timerService.registerProcessingTimeTimer(timerKey, time);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported time domain: " + timerKey.getDomain());
+ switch (timerKey.getDomain()) {
+ case EVENT_TIME:
+ timerService.registerEventTimeTimer(timerKey, time);
+ break;
+ case PROCESSING_TIME:
+ case SYNCHRONIZED_PROCESSING_TIME:
+ timerService.registerProcessingTimeTimer(timerKey, time);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported time domain: " + timerKey.getDomain());
}
}
@@ -747,13 +751,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public void deleteTimer(TimerData timerKey) {
long time = timerKey.getTimestamp().getMillis();
- if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
- timerService.deleteEventTimeTimer(timerKey, time);
- } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
- timerService.deleteProcessingTimeTimer(timerKey, time);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported time domain: " + timerKey.getDomain());
+ switch (timerKey.getDomain()) {
+ case EVENT_TIME:
+ timerService.deleteEventTimeTimer(timerKey, time);
+ break;
+ case PROCESSING_TIME:
+ case SYNCHRONIZED_PROCESSING_TIME:
+ timerService.deleteProcessingTimeTimer(timerKey, time);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported time domain: " + timerKey.getDomain());
}
}
[2/2] beam git commit: This closes #2826: Use processing time for
synchronized processing time in Flink runner
Posted by ke...@apache.org.
This closes #2826: Use processing time for synchronized processing time in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2c89eda
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2c89eda
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2c89eda
Branch: refs/heads/master
Commit: c2c89eda9d6110414548d7131214cfd9946b3a13
Parents: b40b265 05722ac
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 12:00:25 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 12:00:25 2017 -0700
----------------------------------------------------------------------
.../wrappers/streaming/DoFnOperator.java | 36 ++++++++++++--------
1 file changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c2c89eda/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------