You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/02/21 14:39:04 UTC

[GitHub] flink pull request #3377: [FLINK-5645] Store accumulators/metrics for cancel...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/3377

    [FLINK-5645] Store accumulators/metrics for canceled/failed tasks

    This PR modified the Execution/ExecutionGraph to store transmitted io-metrics/accumulators for canceled/failed tasks. Previously these were only stored for finished tasks. For other tasks they were simply discarded.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 5645_metrics_failed

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3377.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3377
    
----
commit 1a68ee62a86de283a1a0210dc637ac65d3860bef
Author: zentol <ch...@apache.org>
Date:   2017-02-21T11:36:17Z

    [FLINK-5645] Store accumulators/metrics for canceled/failed tasks

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3377: [FLINK-5645] Store accumulators/metrics for cancel...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3377#discussion_r104394092
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -1311,6 +1311,19 @@ public boolean updateState(TaskExecutionState state) {
     		}
     	}
     
    +	private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
    --- End diff --
    
    I've checked that returning `null` is the same behaviour as in `accumulators.deserializeUserAccumulators(userClassLoader)`. In general it would be good to add annotations for `@Nullable` return types or simply return empty collections, but that's an orthogonal change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3377: [FLINK-5645] Store accumulators/metrics for canceled/fail...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/3377
  
    Thanks for addressing Tills comments. I think this is good to merge now. Could you go ahead and do it?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3377: [FLINK-5645] Store accumulators/metrics for cancel...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3377#discussion_r102464984
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -1292,10 +1292,28 @@ public boolean updateState(TaskExecutionState state) {
     					}
     					return true;
     				case CANCELED:
    -					attempt.cancelingComplete();
    +					AccumulatorSnapshot acc1 = state.getAccumulators();
    +					Map<String, Accumulator<?, ?>> userAcc1 = null;
    +					if (acc1 != null) {
    +						try {
    +							userAcc1 = acc1.deserializeUserAccumulators(userClassLoader);
    +						} catch (Exception e) {
    +							LOG.error("Failed to deserialize final accumulator results.", e);
    +						}
    +					}
    +					attempt.cancelingComplete(userAcc1, state.getIOMetrics());
     					return true;
     				case FAILED:
    -					attempt.markFailed(state.getError(userClassLoader));
    +					AccumulatorSnapshot acc2 = state.getAccumulators();
    +					Map<String, Accumulator<?, ?>> userAcc2 = null;
    +					if (acc2 != null) {
    +						try {
    +							userAcc2 = acc2.deserializeUserAccumulators(userClassLoader);
    +						} catch (Exception e) {
    +							LOG.error("Failed to deserialize final accumulator results.", e);
    +						}
    +					}
    --- End diff --
    
    Can this deserialization logic be factored out into a method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3377: [FLINK-5645] Store accumulators/metrics for cancel...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3377


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3377: [FLINK-5645] Store accumulators/metrics for cancel...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3377#discussion_r102465162
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ---
    @@ -53,6 +53,22 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by
     		this.numBytesOutPerSecond = bytesOut.getRate();
     	}
     
    +	public IOMetrics(
    +			int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut,
    +			double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond,
    +			double numRecordsInPerSecond, double numRecordsOutPerSecond) {
    --- End diff --
    
    I'm not sure whether this line breaking pattern is consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3377: [FLINK-5645] Store accumulators/metrics for cancel...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3377#discussion_r102467283
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -855,14 +877,23 @@ private boolean processFail(Throwable t, boolean isCallback) {
     			}
     
     			if (current == CANCELING) {
    -				cancelingComplete();
    +				cancelingComplete(userAccumulators, metrics);
     				return false;
     			}
     
     			if (transitionState(current, FAILED, t)) {
     				// success (in a manner of speaking)
     				this.failureCause = t;
     
    +				if (userAccumulators != null) {
    +					synchronized (accumulatorLock) {
    +						this.userAccumulators = userAccumulators;
    +					}
    +				}
    +				if (metrics != null) {
    +					this.ioMetrics = metrics;
    +				}
    --- End diff --
    
    Can this also be a separate method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3377: [FLINK-5645] Store accumulators/metrics for canceled/fail...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3377
  
    @tillrohrmann I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3377: [FLINK-5645] Store accumulators/metrics for cancel...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3377#discussion_r102471279
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ---
    @@ -53,6 +53,22 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by
     		this.numBytesOutPerSecond = bytesOut.getRate();
     	}
     
    +	public IOMetrics(
    +			int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut,
    +			double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond,
    +			double numRecordsInPerSecond, double numRecordsOutPerSecond) {
    --- End diff --
    
    well there *is* a pattern: L1 are counts, L2 are byte rates, L3 are record rates.
    
    Can change it though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---