You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/16 10:13:36 UTC
flink git commit: [FLINK-4063] Add Metrics Support for Triggers
Repository: flink
Updated Branches:
refs/heads/master fe0eb602d -> 104958523
[FLINK-4063] Add Metrics Support for Triggers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10495852
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10495852
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10495852
Branch: refs/heads/master
Commit: 10495852370fed3b7683d428f6c2cc7470b3d8ed
Parents: fe0eb60
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jun 15 14:30:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 16 12:13:27 2016 +0200
----------------------------------------------------------------------
.../streaming/api/windowing/triggers/Trigger.java | 13 +++++++++++++
.../runtime/operators/windowing/WindowOperator.java | 7 ++++++-
2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10495852/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 452a5b1..c9b9ff1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -19,11 +19,13 @@
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.windowing.windows.Window;
import java.io.Serializable;
@@ -125,6 +127,17 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
public interface TriggerContext {
/**
+ * Returns the metric group for this {@link Trigger}. This is the same metric
+ * group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
+ * function.
+ *
+ * <p>You must not call methods that create metric objects
+ * (such as {@link MetricGroup#counter(int)} multiple times but instead call once
+ * and store the metric object in a field.
+ */
+ MetricGroup getMetricGroup();
+
+ /**
* Returns the current watermark time.
*/
long getCurrentWatermark();
http://git-wip-us.apache.org/repos/asf/flink/blob/10495852/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 7b49e0b..18020b3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -42,6 +42,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -234,7 +235,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
currentWatermark = Long.MIN_VALUE;
-
}
@Override
@@ -479,6 +479,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.window = window;
}
+ @Override
+ public MetricGroup getMetricGroup() {
+ return WindowOperator.this.getMetricGroup();
+ }
+
public long getCurrentWatermark() {
return currentWatermark;
}