You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/19 21:10:06 UTC

[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

This closes #3950.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccaffe3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccaffe3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccaffe3

Branch: refs/heads/master
Commit: 8ccaffe3d3f2472fc12fa138c45c0b67458ad2a2
Parents: 4b48530
Author: zentol <ch...@apache.org>
Authored: Fri May 19 14:39:20 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ccaffe3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index d34686d..367b773 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -114,6 +116,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 	private long lastEmittedWatermark1;
 	private long lastEmittedWatermark2;
 
+	private Counter numRecordsIn;
+
 	private boolean isFinished;
 
 	@SuppressWarnings("unchecked")
@@ -195,6 +199,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		if (isFinished) {
 			return false;
 		}
+		if (numRecordsIn == null) {
+			numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+		}
 
 		while (true) {
 			if (currentRecordDeserializer != null) {
@@ -230,6 +237,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 						else {
 							StreamRecord<IN1> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
+								numRecordsIn.inc();
 								streamOperator.setKeyContextElement1(record);
 								streamOperator.processElement1(record);
 							}
@@ -256,6 +264,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 						else {
 							StreamRecord<IN2> record = recordOrWatermark.asRecord();
 							synchronized (lock) {
+								numRecordsIn.inc();
 								streamOperator.setKeyContextElement2(record);
 								streamOperator.processElement2(record);
 							}