You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/15 13:53:02 UTC

flink git commit: [FLINK-4174] Add accessor for current watermark in Evictor Context

Repository: flink
Updated Branches:
  refs/heads/master 1adefee2e -> bd2fce6e1


[FLINK-4174] Add accessor for current watermark in Evictor Context


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd2fce6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd2fce6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd2fce6e

Branch: refs/heads/master
Commit: bd2fce6e1aa8d1f568092488946e5cda44c0cb81
Parents: 1adefee
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Nov 15 14:52:16 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Nov 15 14:52:16 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/windowing/evictors/Evictor.java     | 7 +++++--
 .../runtime/operators/windowing/EvictingWindowOperator.java | 9 +++++++--
 .../runtime/operators/windowing/WindowOperator.java         | 2 +-
 .../operators/windowing/EvictingWindowOperatorTest.java     | 2 --
 4 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index 02e93eb..7557766 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -70,8 +70,7 @@ public interface Evictor<T, W extends Window> extends Serializable {
 	interface EvictorContext {
 
 		/**
-		 * Returns the current processing time, as returned by
-		 * the {@link ProcessingTimeService#getCurrentProcessingTime}.
+		 * Returns the current processing time.
 		 */
 		long getCurrentProcessingTime();
 
@@ -86,6 +85,10 @@ public interface Evictor<T, W extends Window> extends Serializable {
 		 */
 		MetricGroup getMetricGroup();
 
+		/**
+		 * Returns the current watermark time.
+		 */
+		long getCurrentWatermark();
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 3be3f5a..150f46e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -63,7 +63,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 	private final Evictor<? super IN, ? super W> evictor;
 
-	protected transient EvictorContext evictorContext = new EvictorContext(null, null);
+	private transient EvictorContext evictorContext;
 
 	private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;
 
@@ -348,7 +348,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 		@Override
 		public long getCurrentProcessingTime() {
-			return EvictingWindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+			return internalTimerService.currentProcessingTime();
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return internalTimerService.currentWatermark();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 6ff3999..0ead14a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -139,7 +139,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	// State that needs to be checkpointed
 	// ------------------------------------------------------------------------
 
-	private transient InternalTimerService<W> internalTimerService;
+	protected transient InternalTimerService<W> internalTimerService;
 
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.

http://git-wip-us.apache.org/repos/asf/flink/blob/bd2fce6e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 46495b0..8da1d7c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -59,8 +59,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class EvictingWindowOperatorTest {
 
-	// For counting if close() is called the correct number of times on the SumReducer
-
 	/**
 	 * Tests CountEvictor evictAfter behavior
 	 * @throws Exception