You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/16 10:13:36 UTC

flink git commit: [FLINK-4063] Add Metrics Support for Triggers

Repository: flink
Updated Branches:
  refs/heads/master fe0eb602d -> 104958523


[FLINK-4063] Add Metrics Support for Triggers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10495852
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10495852
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10495852

Branch: refs/heads/master
Commit: 10495852370fed3b7683d428f6c2cc7470b3d8ed
Parents: fe0eb60
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jun 15 14:30:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 16 12:13:27 2016 +0200

----------------------------------------------------------------------
 .../streaming/api/windowing/triggers/Trigger.java      | 13 +++++++++++++
 .../runtime/operators/windowing/WindowOperator.java    |  7 ++++++-
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10495852/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 452a5b1..c9b9ff1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -19,11 +19,13 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 import java.io.Serializable;
@@ -125,6 +127,17 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 	public interface TriggerContext {
 
 		/**
+		 * Returns the metric group for this {@link Trigger}. This is the same metric
+		 * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
+		 * function.
+		 *
+		 * <p>You must not call methods that create metric objects
+		 * (such as {@link MetricGroup#counter(int)} multiple times but instead call once
+		 * and store the metric object in a field.
+		 */
+		MetricGroup getMetricGroup();
+
+		/**
 		 * Returns the current watermark time.
 		 */
 		long getCurrentWatermark();

http://git-wip-us.apache.org/repos/asf/flink/blob/10495852/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 7b49e0b..18020b3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -42,6 +42,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -234,7 +235,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		currentWatermark = Long.MIN_VALUE;
-
 	}
 
 	@Override
@@ -479,6 +479,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			this.window = window;
 		}
 
+		@Override
+		public MetricGroup getMetricGroup() {
+			return WindowOperator.this.getMetricGroup();
+		}
+
 		public long getCurrentWatermark() {
 			return currentWatermark;
 		}