You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/15 13:53:02 UTC
flink git commit: [FLINK-4174] Add accessor for current watermark in
Evictor Context
Repository: flink
Updated Branches:
refs/heads/master 1adefee2e -> bd2fce6e1
[FLINK-4174] Add accessor for current watermark in Evictor Context
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd2fce6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd2fce6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd2fce6e
Branch: refs/heads/master
Commit: bd2fce6e1aa8d1f568092488946e5cda44c0cb81
Parents: 1adefee
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Nov 15 14:52:16 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Nov 15 14:52:16 2016 +0100
----------------------------------------------------------------------
.../flink/streaming/api/windowing/evictors/Evictor.java | 7 +++++--
.../runtime/operators/windowing/EvictingWindowOperator.java | 9 +++++++--
.../runtime/operators/windowing/WindowOperator.java | 2 +-
.../operators/windowing/EvictingWindowOperatorTest.java | 2 --
4 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index 02e93eb..7557766 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -70,8 +70,7 @@ public interface Evictor<T, W extends Window> extends Serializable {
interface EvictorContext {
/**
- * Returns the current processing time, as returned by
- * the {@link ProcessingTimeService#getCurrentProcessingTime}.
+ * Returns the current processing time.
*/
long getCurrentProcessingTime();
@@ -86,6 +85,10 @@ public interface Evictor<T, W extends Window> extends Serializable {
*/
MetricGroup getMetricGroup();
+ /**
+ * Returns the current watermark time.
+ */
+ long getCurrentWatermark();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 3be3f5a..150f46e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -63,7 +63,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
private final Evictor<? super IN, ? super W> evictor;
- protected transient EvictorContext evictorContext = new EvictorContext(null, null);
+ private transient EvictorContext evictorContext;
private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;
@@ -348,7 +348,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@Override
public long getCurrentProcessingTime() {
- return EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+ return internalTimerService.currentProcessingTime();
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return internalTimerService.currentWatermark();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 6ff3999..0ead14a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -139,7 +139,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// State that needs to be checkpointed
// ------------------------------------------------------------------------
- private transient InternalTimerService<W> internalTimerService;
+ protected transient InternalTimerService<W> internalTimerService;
/**
* Creates a new {@code WindowOperator} based on the given policies and user functions.
http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 46495b0..8da1d7c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -59,8 +59,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class EvictingWindowOperatorTest {
- // For counting if close() is called the correct number of times on the SumReducer
-
/**
* Tests CountEvictor evictAfter behavior
* @throws Exception