You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/06/18 21:40:41 UTC

[2/3] flink git commit: [FLINK-3949] [metrics] Add numSplitsProcessed counter metric.

[FLINK-3949] [metrics] Add numSplitsProcessed counter metric.

This closes #2119


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a0c268d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a0c268d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a0c268d

Branch: refs/heads/master
Commit: 5a0c268dbd4abdf39c7b9d8f25ea629dfd4681b1
Parents: 18744b2
Author: zentol <ch...@apache.org>
Authored: Fri Jun 17 09:40:01 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/operators/DataSourceTask.java    | 4 ++--
 .../api/functions/source/ContinuousFileReaderOperator.java    | 3 +++
 .../api/functions/source/InputFormatSourceFunction.java       | 3 +++
 .../api/functions/source/InputFormatSourceFunctionTest.java   | 7 +++++++
 4 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index c57f133..68e29b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -101,7 +101,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		LOG.debug(getLogString("Starting data source operator"));
 
 		RuntimeContext ctx = createRuntimeContext();
-		Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
+		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
 		Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut");
 
 		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
@@ -172,7 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					// close. We close here such that a regular close throwing an exception marks a task as failed.
 					format.close();
 				}
-				splitCounter.inc();
+				completedSplitsCounter.inc();
 			} // end for all input splits
 
 			// close the collector. if it is a chaining task collector, it will close its chained tasks

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 9319338..455c753 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -235,6 +236,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		public void run() {
 			try {
 
+				Counter completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed");
 				this.format.openInputFormat();
 
 				while (this.isRunning) {
@@ -290,6 +292,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 								}
 							}
 						}
+						completedSplitsCounter.inc();
 
 					} finally {
 						// close and prepare for the next iteration

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index bce1ec5..f35cbba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
@@ -70,6 +71,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
 	public void run(SourceContext<OUT> ctx) throws Exception {
 		try {
 
+			Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
 			if (isRunning && format instanceof RichInputFormat) {
 				((RichInputFormat) format).openInputFormat();
 			}
@@ -86,6 +88,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
 					ctx.collect(nextElement);
 				}
 				format.close();
+				completedSplitsCounter.inc();
 
 				if (isRunning) {
 					isRunning = splitIterator.hasNext();

http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 32776e1..a41c7db 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -253,6 +255,11 @@ public class InputFormatSourceFunctionTest {
 		}
 
 		@Override
+		public MetricGroup getMetricGroup() {
+			return new UnregisteredMetricsGroup();
+		}
+
+		@Override
 		public InputSplitProvider getInputSplitProvider() {
 			try {
 				this.inputSplits = format.createInputSplits(noOfSplits);