You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/26 13:07:19 UTC
flink git commit: [hotfix] Reorder ProcessOperator methods so they
conform to the order in the interface
Repository: flink
Updated Branches:
refs/heads/master e7a060947 -> 68f446c9f
[hotfix] Reorder ProcessOperator methods so they conform to the order in the interface
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68f446c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68f446c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68f446c9
Branch: refs/heads/master
Commit: 68f446c9f38f3faf70413fb19aab742871bb4a33
Parents: e7a0609
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Oct 25 23:01:22 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Oct 26 15:05:34 2017 +0200
----------------------------------------------------------------------
.../api/operators/KeyedProcessOperator.java | 27 ++++++++++----------
.../operators/co/KeyedCoProcessOperator.java | 12 ++++-----
2 files changed, 20 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/68f446c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index 0f4b4f5..6501a9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -118,16 +118,17 @@ public class KeyedProcessOperator<K, IN, OUT>
}
@Override
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ @Override
public <X> void output(OutputTag<X> outputTag, X value) {
if (outputTag == null) {
throw new IllegalArgumentException("OutputTag must not be null.");
}
- output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
- }
- @Override
- public TimerService timerService() {
- return timerService;
+ output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}
}
@@ -145,18 +146,17 @@ public class KeyedProcessOperator<K, IN, OUT>
}
@Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
- }
-
- @Override
public Long timestamp() {
checkState(timer != null);
return timer.getTimestamp();
}
@Override
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ @Override
public <X> void output(OutputTag<X> outputTag, X value) {
if (outputTag == null) {
throw new IllegalArgumentException("OutputTag must not be null.");
@@ -166,8 +166,9 @@ public class KeyedProcessOperator<K, IN, OUT>
}
@Override
- public TimerService timerService() {
- return timerService;
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/68f446c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
index e9402cf..4e2bfc7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -162,12 +162,6 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
}
@Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
- }
-
- @Override
public Long timestamp() {
checkState(timer != null);
return timer.getTimestamp();
@@ -186,5 +180,11 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
}
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
+ }
}
}