You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/06/18 21:40:41 UTC
[2/3] flink git commit: [FLINK-3949] [metrics] Add numSplitsProcessed
counter metric.
[FLINK-3949] [metrics] Add numSplitsProcessed counter metric.
This closes #2119
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a0c268d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a0c268d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a0c268d
Branch: refs/heads/master
Commit: 5a0c268dbd4abdf39c7b9d8f25ea629dfd4681b1
Parents: 18744b2
Author: zentol <ch...@apache.org>
Authored: Fri Jun 17 09:40:01 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Jun 18 23:40:23 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/operators/DataSourceTask.java | 4 ++--
.../api/functions/source/ContinuousFileReaderOperator.java | 3 +++
.../api/functions/source/InputFormatSourceFunction.java | 3 +++
.../api/functions/source/InputFormatSourceFunctionTest.java | 7 +++++++
4 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index c57f133..68e29b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -101,7 +101,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
LOG.debug(getLogString("Starting data source operator"));
RuntimeContext ctx = createRuntimeContext();
- Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
+ Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut");
if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
@@ -172,7 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
// close. We close here such that a regular close throwing an exception marks a task as failed.
format.close();
}
- splitCounter.inc();
+ completedSplitsCounter.inc();
} // end for all input splits
// close the collector. if it is a chaining task collector, it will close its chained tasks
http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 9319338..455c753 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -235,6 +236,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
public void run() {
try {
+ Counter completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed");
this.format.openInputFormat();
while (this.isRunning) {
@@ -290,6 +292,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
}
}
+ completedSplitsCounter.inc();
} finally {
// close and prepare for the next iteration
http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index bce1ec5..f35cbba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -70,6 +71,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
public void run(SourceContext<OUT> ctx) throws Exception {
try {
+ Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
if (isRunning && format instanceof RichInputFormat) {
((RichInputFormat) format).openInputFormat();
}
@@ -86,6 +88,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
ctx.collect(nextElement);
}
format.close();
+ completedSplitsCounter.inc();
if (isRunning) {
isRunning = splitIterator.hasNext();
http://git-wip-us.apache.org/repos/asf/flink/blob/5a0c268d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 32776e1..a41c7db 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -253,6 +255,11 @@ public class InputFormatSourceFunctionTest {
}
@Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
public InputSplitProvider getInputSplitProvider() {
try {
this.inputSplits = format.createInputSplits(noOfSplits);