You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/19 21:10:06 UTC
[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in
StreamTwoInputProcessor
[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor
This closes #3950.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccaffe3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccaffe3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccaffe3
Branch: refs/heads/master
Commit: 8ccaffe3d3f2472fc12fa138c45c0b67458ad2a2
Parents: 4b48530
Author: zentol <ch...@apache.org>
Authored: Fri May 19 14:39:20 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200
----------------------------------------------------------------------
.../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ccaffe3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index d34686d..367b773 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -114,6 +116,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
private long lastEmittedWatermark1;
private long lastEmittedWatermark2;
+ private Counter numRecordsIn;
+
private boolean isFinished;
@SuppressWarnings("unchecked")
@@ -195,6 +199,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
if (isFinished) {
return false;
}
+ if (numRecordsIn == null) {
+ numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+ }
while (true) {
if (currentRecordDeserializer != null) {
@@ -230,6 +237,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
else {
StreamRecord<IN1> record = recordOrWatermark.asRecord();
synchronized (lock) {
+ numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement1(record);
}
@@ -256,6 +264,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
else {
StreamRecord<IN2> record = recordOrWatermark.asRecord();
synchronized (lock) {
+ numRecordsIn.inc();
streamOperator.setKeyContextElement2(record);
streamOperator.processElement2(record);
}