You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2016/06/10 12:52:40 UTC

[GitHub] flink pull request #2090: Expose several new Metrics

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/2090

    Expose several new Metrics

    This PR exposes several new metrics in Flink.
    
    The names, scope and respective performance impact (outside of the actual metric itself) are listed blow:
    
    Metric | Scope | Performance Impact
    -------|-------|-------------------
    currentLowWatermark | Task | None
    lastCheckpointSize | Task | StreamTaskStateList.getStateSize() for every checkpoint
    numBytes(In/Out)(Local/Remote) | Task |  Buffer.getSize() for every buffer
    numRecords(In/Out) | Operator | Streaming: None / Batch: some additional null checks
    numSplitsProcessed | Operator | None
    
    The currentLowWatermark metric simply exposes the last emitted Watermark in the StreamInputProcessor using a Gauge.
    
    The lastCheckpointSize metric is gathered in the StreamTask.performCheckpoint method. For every checkpoint made the size is computed and stored in a field, which is exposed with a Gauge.
    
    The numSplitsProcessed metric is a simple counter in the DataSourceTask/FileSourceFunction.
    
    The numBytes(In/Out)(Local/Remote) metric touches the 2nd most files. It replaces the previous numBytes(In/Out) metric which was exposed in the Deserializers. It is now gathered directly in the InputChannels, adding the size of read buffers to a Counter. It is thus also more accurate than the previous version, since that one actually measured you many bytes were deserialized.
    
    The numRecords(In/Out) metric was extended to measure per operator instead of task. This includes chained operators. The information is no longer gathered in the deserializers but the operator itself. This was generally accomplished by wrapping in- and output in Counting(Output/Collector/Iterator/...) classes. Below are more specific details for both Streaming and Batch as to how these were implemented.
    
    ### Streaming
    * numRecordsIn is measured in the StreamInputProcesser or the ChainingOutput class.
    * numRecordsOut is measured by wrapping the output in a CountingOutput, set in the AbstractStreamOperator.
    
    The StreamSource class was slightly modified, adding a second run() method without a collector argument. Using this method the StreamSource now uses the same output that is set in the AbstractStreamOperator. The old method was kept in place so as not to change tests.
    
    The StreamIterationTail class needed some extra love as well, since the contained RecordPusher did not follow the StreamOperator lifecycle. On this operator setup() was never called, leading to an NPE since the MetricGroup was never set. In order to be able to pass an Output object into setup() the RecordPusher's internal logic was moved into a new IterationTailOutput class. This slightly modified the exception behaviour; InterruptedExceptions are now longer simply forwarded, but wrapped in a RuntimeException instead.
    
    ### Batch
    * numRecordsIn is measured in each Driver class separately. In cases where an iterator is passed into the UDF the underlying MutableObjectIterator was wrapped in a CountingMutableObjectIterator. This should ensure that no input record is counted multiple times.
    * numRecordsOut is measured by wrapping the collector in a CountingCollector, set in each Drivers prepare() or run() method.
    
    Note that the following drivers were nto changed: (Join/CoGroup)WithSolutionSet(First/Second)Driver.
    I coiuldn't figure out a way to not count records multiple times, specifically those residing in the HashTable, so I ignored them.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink metrics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2090
    
----
commit e585c47c303b0c4feee097f36315be464da19a8e
Author: zentol <ch...@apache.org>
Date:   2016-06-01T09:24:40Z

    task: lowWatermark

commit 2c7b18b1476d548382bf6e5a49d496508a4fc9ae
Author: zentol <ch...@apache.org>
Date:   2016-06-03T09:32:28Z

    task: checkpointSize

commit b04e1dde2ecc101e6a45bcbc83921f8a4d9c3c1b
Author: zentol <ch...@apache.org>
Date:   2016-06-08T09:20:14Z

    task: numBytes(In/Out)(Local/Remote)

commit d131bfa3a1f95d22900bba131b6c300ee881e5c7
Author: zentol <ch...@apache.org>
Date:   2016-06-07T18:20:10Z

    operator: numSplitsProcessed

commit ffbba601fab4542b30c299c3650a18b4b2612df2
Author: zentol <ch...@apache.org>
Date:   2016-06-10T11:24:49Z

    operator: numRecords(In/Out) Basics

commit 9428ba9291380f56c0486c124163e3ad58ccfae8
Author: zentol <ch...@apache.org>
Date:   2016-06-08T09:25:30Z

    operator: streaming: numRecords(In/Out)

commit 85e81e42147930af255c1991726c290281b9d82e
Author: zentol <ch...@apache.org>
Date:   2016-06-10T11:25:18Z

    operator: batch: unchained: numRecords(In/Out)

commit ae0ba804cc664d8044749257ab6303aadaeb853e
Author: zentol <ch...@apache.org>
Date:   2016-06-08T12:03:19Z

    operator: batch: chaining: numRecords(In/Out)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67107345
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java ---
    @@ -123,9 +127,11 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     			OUT nextElement = serializer.createInstance();
     			nextElement =  format.nextRecord(nextElement);
     			if (nextElement == null && splitIterator.hasNext()) {
    +				splitCounter.inc();
    --- End diff --
    
    I'll remove the splitCounter metric for now, and add it back in once #2100 is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67009404
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java ---
    @@ -56,39 +57,60 @@ public void init() throws Exception {
     		
     		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
     		
    -		this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
    +		this.headOperator = new RecordPusher<>();
    +		this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
     	}
     
     	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
     		
     		private static final long serialVersionUID = 1L;
     
    +		@Override
    +		public void processElement(StreamRecord<IN> record) throws Exception {
    +			output.collect(record);
    +		}
    +
    +		@Override
    +		public void processWatermark(Watermark mark) {
    +			// ignore
    +		}
    +	}
    +
    +	private static class IterationTailOutput<IN> implements Output<StreamRecord<IN>> {
    +
     		@SuppressWarnings("NonSerializableFieldInSerializableClass")
     		private final BlockingQueue<StreamRecord<IN>> dataChannel;
     		
     		private final long iterationWaitTime;
     		
     		private final boolean shouldWait;
     
    -		RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
    +		IterationTailOutput(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
     			this.dataChannel = dataChannel;
     			this.iterationWaitTime = iterationWaitTime;
     			this.shouldWait =  iterationWaitTime > 0;
     		}
     
     		@Override
    -		public void processElement(StreamRecord<IN> record) throws Exception {
    -			if (shouldWait) {
    -				dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
    -			}
    -			else {
    -				dataChannel.put(record);
    +		public void emitWatermark(Watermark mark) {
    +		}
    +
    +		@Override
    +		public void collect(StreamRecord<IN> record) {
    +			try {
    +				if (shouldWait) {
    +						dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Indention


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67011583
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java ---
    @@ -123,9 +127,11 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     			OUT nextElement = serializer.createInstance();
     			nextElement =  format.nextRecord(nextElement);
     			if (nextElement == null && splitIterator.hasNext()) {
    +				splitCounter.inc();
    --- End diff --
    
    In `DataSourceTask` the counter is incremented when a split is closed. We should sync both classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2090: Expose several new Metrics

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2090
  
    @fhueske I've addressed your comments. I also removed the setMetricGroup method in the ReaderBase class (all implementations were empty), add added the watermark metric to the StreamTwoInputProcessor as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2090
  
    Thanks for the update @zentol. Good to merge, IMO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2090
  
    Hi @zentol, didn't find anything crucial. Just a few minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67010432
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java ---
    @@ -26,33 +26,27 @@
      */
     public class IOMetricGroup extends AbstractMetricGroup {
     
    -	private final Counter numBytesIn;
     	private final Counter numBytesOut;
    -	private final Counter numRecordsIn;
    -	private final Counter numRecordsOut;
    +	private final Counter numBytesInLocal;
    +	private final Counter numBytesInRemote;
     
     	public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
     		super(registry, parent.getScopeComponents());
    -
    -		this.numBytesIn = parent.counter("numBytesIn");
     		this.numBytesOut = parent.counter("numBytesOut");
    -		this.numRecordsIn = parent.counter("numRecordsIn");
    -		this.numRecordsOut = parent.counter("numRecordsOut");
    -	}
     
    -	public Counter getBytesInCounter() {
    -		return numBytesIn;
    +		this.numBytesInLocal = parent.counter("numBytesInLocal");
    +		this.numBytesInRemote = parent.counter("numBytesInRemote");
     	}
     
     	public Counter getBytesOutCounter() {
     		return numBytesOut;
     	}
     
    -	public Counter getRecordsInCounter() {
    -		return numRecordsIn;
    +	public Counter getNumBytesInLocal() {
    +		return numBytesInLocal;
     	}
     
    -	public Counter getRecordsOutCounter() {
    -		return numRecordsOut;
    +	public Counter getNumBytesInRemote() {
    --- End diff --
    
    rename to `getNumBytesInRemoteCounter`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67011166
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---
    @@ -165,6 +172,7 @@ public void invoke() throws Exception {
     					// close. We close here such that a regular close throwing an exception marks a task as failed.
     					format.close();
     				}
    +				splitCounter.inc();
    --- End diff --
    
    Should we make the semantic of this counter more explicit and call it `completedSplitCounter`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67010629
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java ---
    @@ -181,8 +168,6 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) {
     
     	@Override
     	public void instantiateMetrics(IOMetricGroup metrics) {
    --- End diff --
    
    Do we need this method in `RecordDeserializer`? Both implementations of `RecordDeserializer` implement it as empty method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67107168
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java ---
    @@ -123,9 +127,11 @@ public void run(SourceContext<OUT> ctx) throws Exception {
     			OUT nextElement = serializer.createInstance();
     			nextElement =  format.nextRecord(nextElement);
     			if (nextElement == null && splitIterator.hasNext()) {
    +				splitCounter.inc();
    --- End diff --
    
    the behaviour in both classed is the same. If the format returns null it is considered empty (==completed), hence we increase the counter.
    
    well the FIleSourceFunction doesn't properly call close() once the split was processed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2090


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2090: Expose several new Metrics

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2090#discussion_r67010378
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java ---
    @@ -26,33 +26,27 @@
      */
     public class IOMetricGroup extends AbstractMetricGroup {
     
    -	private final Counter numBytesIn;
     	private final Counter numBytesOut;
    -	private final Counter numRecordsIn;
    -	private final Counter numRecordsOut;
    +	private final Counter numBytesInLocal;
    +	private final Counter numBytesInRemote;
     
     	public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
     		super(registry, parent.getScopeComponents());
    -
    -		this.numBytesIn = parent.counter("numBytesIn");
     		this.numBytesOut = parent.counter("numBytesOut");
    -		this.numRecordsIn = parent.counter("numRecordsIn");
    -		this.numRecordsOut = parent.counter("numRecordsOut");
    -	}
     
    -	public Counter getBytesInCounter() {
    -		return numBytesIn;
    +		this.numBytesInLocal = parent.counter("numBytesInLocal");
    +		this.numBytesInRemote = parent.counter("numBytesInRemote");
     	}
     
     	public Counter getBytesOutCounter() {
     		return numBytesOut;
     	}
     
    -	public Counter getRecordsInCounter() {
    -		return numRecordsIn;
    +	public Counter getNumBytesInLocal() {
    --- End diff --
    
    rename to `getNumBytesInLocalCounter` to be consistent with previous methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---