You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rdblue <gi...@git.apache.org> on 2018/06/18 20:31:27 UTC
[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...
Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21577#discussion_r196214788
--- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -131,16 +139,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
reason match {
case Success =>
// The task output has been committed successfully
- case denied: TaskCommitDenied =>
- logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
- s"attempt: $attemptNumber")
- case otherReason =>
+ case _: TaskCommitDenied =>
+ logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " +
+ s"partition: $partition, attempt: $attemptNumber")
+ case _ =>
// Mark the attempt as failed to blacklist from future commit protocol
- stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber
- if (stageState.authorizedCommitters(partition) == attemptNumber) {
+ val taskId = TaskIdentifier(stageAttempt, attemptNumber)
+ stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
+ if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
- stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
+ stageState.authorizedCommitters(partition) = null
--- End diff --
Nit: why not use Option[TaskIdentifier] and None here?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org