You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rezasafi <gi...@git.apache.org> on 2017/11/29 17:20:28 UTC

[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

GitHub user rezasafi opened a pull request:

    https://github.com/apache/spark/pull/19848

    [SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol

    I have modified SparkHadoopWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId in the driver, and the stageId in the executors. With this change, FileCommitProtocol now has a commitTask method that will receive  rddId as the JobId  in addition to the stageId. Then during the hadoop commit protocol, the jobId will be used by hadoop while spark can still uses stageId like before. This way executors and the driver will consistently uses stageId.
    In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rezasafi/spark stagerddsimple

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19848.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19848
    
----
commit 4dbdbe77435630e3b35581c59189ec75c9c2484d
Author: Reza Safi <re...@cloudera.com>
Date:   2017-11-28T23:03:37Z

    [SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    I have one concern about this -- there is a case where you are not giving a unique id to the hadoop committers.  You could save one rdd twice, and even have both of those operations running concurrently.  I suppose its weird enough that we don't need to worry about it?
    
    I don't think there are any problems w/ stage retry -- that only applies to shuffle map stages, and the hadoop writer is only for result stages.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Done. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Job is is only used in the normal FileOutputCommitter to generate unique paths, using`s" _temporary/$jobid_$job-attempt"` for the file (ie. job-attempt-ID, which is jobID+attempt). 
    When trying to recover a job (v1 algorithm only), it works out its current job Id and attempt, and looks for committed task directories in the dir $jobId-${attemptId-1}; moves them into the current attempt as completed, sets off to do the remainder. Relies on rename of task attempt directories to be atomic, assumes that they are 0(1). 
    
    Stocator (ask @gilv) uses the job attempt ID for naming the final files created; I don't know the implications there, but given it's written for Spark, you can assume the current numbering scheme works.
    
    I don't know of anything which assumes that jobIDs (and transitively) jobAttemptIDs, taskAttemptIDs) are UUIDs. Might be worth specifying that there is no such guarantee in whatever docs get written up.
    
    The main issue with reusing the job ID will be if there is any execution where >1 job is attempting to write/overwrite data in the same directory tree (i.e. adding new partitions to an existing dataset, in situ). That's a pretty dangerous thing to be doing, and given the current FileOutputCommitter does a `rm $dest/_temporary` at the end of a commit, currently doomed. Whichever job commits first blocks the other from committing (I don't know if that's intentional, or just a side effect of the cleanup logic). Similarly the new S3A commiters cancel all pending multipart uplaods to a dir: killing outstanding jobs. 
    
    If Spark SQL plans to support simultaneous writes to the same dest dir, well, more than just the the job ID needs to change. So don't worry about it until then,


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    (There was a conflict in 2.2, open a new PR if you want it there.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84380 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84380/testReport)** for PR 19848 at commit [`8e68683`](https://github.com/apache/spark/commit/8e6868395e9291ad8ddf2a0251f7e6163618b132).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154156234
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
    @@ -106,6 +106,12 @@ abstract class FileCommitProtocol {
        */
       def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
     
    +  /**
    +   * Commits a task which blongs to a specific stage after the writes succeed.
    --- End diff --
    
    belongs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    > I actually feel like this is something hadoop should be documenting ... we are talking about how committers we happen to know work, rather than talking about the general contract of committers. But even if its not in the hadoop docs, in our jira or mailing list would be better.
    
    I concur. The best there is right now is in [the s3a docs|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md], which is based on stepping through with a debugger while taking notes...not the best thing people should  do. I kept in the hadoop-aws site because there wasn't an equivalent bit of source tree for MR, funnily enough, and doing it in the -aws module made it easier to get in.
    
    Really that could be pulled up somewhere and perhaps accompanied by a list of requirements of committers, and for v1 and v2, their FS requirements. 
    
    At the same time, Spark needs its counterpoint of what it expects, which is equally tricky to work out. The big risk is the mismatch in spark's expectations and that delivered by committers and their stores. In particular, I think Spark assumes that a timeout on a failure to respond to a commit-task-granted message can be handled by granting another speculative task the right. However, that requires taskCommit to be atomic, which only holds for v1.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154156477
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
    @@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
         pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
       }
     
    +  test("The JobId on driver and executor should be the same during the commit") {
    +    // Create more than one rdd to mimic stageId not equal to rddId
    +    val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2).
    --- End diff --
    
    `.` goes on next line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84315 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84315/testReport)** for PR 19848 at commit [`9df75e4`](https://github.com/apache/spark/commit/9df75e4de51d514af20369606d9324a6963e7955).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154403383
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        val stageId = TaskContext.get().stageId()
    +        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
    --- End diff --
    
    Ah, great, another public class that has no good reason to be public...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154236366
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
    @@ -102,14 +103,15 @@ object SparkHadoopWriter extends Logging {
           context: TaskContext,
           config: HadoopWriteConfigUtil[K, V],
           jobTrackerId: String,
    +      commitJobId: Int,
           sparkStageId: Int,
           sparkPartitionId: Int,
           sparkAttemptNumber: Int,
           committer: FileCommitProtocol,
           iterator: Iterator[(K, V)]): TaskCommitMessage = {
         // Set up a task.
         val taskContext = config.createTaskAttemptContext(
    -      jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
    +      jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
    --- End diff --
    
    `sparkStageId` is now unused in this method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r153957791
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
    @@ -106,6 +106,13 @@ abstract class FileCommitProtocol {
        */
       def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
     
    +  /**
    +   * Commits a task which blongs to a specific stage after the writes succeed.
    +   * Must be called on the executors when running tasks.
    +   */
    +  private[spark]
    +  def commitTask(taskContext: TaskAttemptContext, stageId: Int): TaskCommitMessage
    --- End diff --
    
    Modifiers should be in the same line as method declarations, all over your patch. Just look at existing code and follow the style.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84321/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154238528
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        val stageId = TaskContext.get().stageId()
    +        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
    --- End diff --
    
    It make sense to use stageId there, since before jobId was used instead of stageId. I will test that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r157111526
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    --- End diff --
    
    @cloud-fan yeah, but as I mentioned above, removing jobId from the signature of commitTask will cause a binary incompatibility error, since commitTask here is a public method. So although we no longer use it, we cannot remove it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84315/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @mridulm what I meant by same rdd was to run the same job two times on the same cluster but in different spark contexts. So it is not the same rdd, but since sparkContext will start rdd ids from zero then we may have same rdd ids in different executions. The jobTrackerId will be different, but I actually didn't check whether hadoop will cause a different file path based on the jobTrackerId. If that is the case then there will not be a problem. But if not then the commit will fail I guess. I think this can only happen when spark.hadoop.validateOutputSpec is true.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84315 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84315/testReport)** for PR 19848 at commit [`9df75e4`](https://github.com/apache/spark/commit/9df75e4de51d514af20369606d9324a6963e7955).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r157064132
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
    @@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
       }
     }
     
    +class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
    +  def setupJob(j: NewJobContext): Unit = {
    +    JobID.jobid = j.getJobID().getId
    +  }
    +
    +  def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
    +
    +  def setupTask(t: NewTaskAttempContext): Unit = {
    +    val jobId = t.getTaskAttemptID().getJobID().getId
    +    assert(jobId === JobID.jobid)
    +  }
    +
    +  def commitTask(t: NewTaskAttempContext): Unit = {}
    +
    +  def abortTask(t: NewTaskAttempContext): Unit = {}
    +}
    +
    +class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
    +
    +  def checkOutputSpecs(j: NewJobContext): Unit = {}
    +
    +  def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
    +    new NewFakeWriter()
    +  }
    +
    +  def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
    +    new YetAnotherFakeCommitter()
    +  }
    +}
    +
    +object JobID {
    +  var jobid = -1
    --- End diff --
    
    committers may not like a -ve job ID; they do tend to assume and for v1 commit rely on things starting at 0


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84313 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84313/testReport)** for PR 19848 at commit [`4dbdbe7`](https://github.com/apache/spark/commit/4dbdbe77435630e3b35581c59189ec75c9c2484d).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    The branch 2.2 PR for this fix is here: https://github.com/apache/spark/pull/19886


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @rezasafi What do you mean by "different copies of an rdd at different times" ? If they are two different jobs to save, even if of the same rdd, they are two different jobs (save rdd with different input format's for example).
    If it is the same job, the attempt would be different.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84313 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84313/testReport)** for PR 19848 at commit [`4dbdbe7`](https://github.com/apache/spark/commit/4dbdbe77435630e3b35581c59189ec75c9c2484d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84355/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154237986
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
    @@ -102,14 +103,15 @@ object SparkHadoopWriter extends Logging {
           context: TaskContext,
           config: HadoopWriteConfigUtil[K, V],
           jobTrackerId: String,
    +      commitJobId: Int,
           sparkStageId: Int,
           sparkPartitionId: Int,
           sparkAttemptNumber: Int,
           committer: FileCommitProtocol,
           iterator: Iterator[(K, V)]): TaskCommitMessage = {
         // Set up a task.
         val taskContext = config.createTaskAttemptContext(
    -      jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
    +      jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
    --- End diff --
    
    I removed it a few min ago. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84355 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84355/testReport)** for PR 19848 at commit [`f4ef351`](https://github.com/apache/spark/commit/f4ef351a05394ada5449e51a64606f0e5e7647c3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    > Check if the same jobId already is committed and then remove existing files and commit again.
    
    if your job doesn't allow overwrite, that's mostly implicit; it's only in concurrent/sequences of RDD writes where theSaveMode policy != ErrorIfExists. Which is the default. AFAIK
    
    If someone explicitly sets of two independent spark contexts writing RDDs to the same dest with some policy other than that, well: it's pretty ambiguous what's going to happen anyway, so saying "dont do that, then" is probably defensible. 
    
    What if you have >1 RDD write of a failed and restarted spark Context? That's my sole concern.
    
    But here, at least in YARN: fail of the AM will trigger release of all containers of workers round the cluster; except in the failure mode "Node manager isolated along with worker and HA YARN turned on so NMs don't panic if RM is unreachable for a while". 
    
    This is all pretty convoluted. I think the only think I care about is "can things be confident that if there are partitioned workers from a previous job, they won't interfere with the work being done by a successor, or, if they do interfere, its manifest as a failure, rather some silent corruption of output failure"



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84355 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84355/testReport)** for PR 19848 at commit [`f4ef351`](https://github.com/apache/spark/commit/f4ef351a05394ada5449e51a64606f0e5e7647c3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @squito Is there a requirement that it should be globally unique ? I am not sure whether (some?) committers make this assumption : and the few I did take a look at did not seem to care.
    If there is such a requirement (a store which keeps track of commit transactions by id ?), we can fallback to using uuid and propagating that to executors via conf (as opposed to using rdd id).
    The current situation of driver and executors having split brain was bad - anything consistent (unique or this approach) would be preferable.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @steveloughran can you bring this up on dev@?  we should move this discussion off of this PR.
    
    (sorry haven't had a chance to look yet, but I appreciate you doing this)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Side note: this would be a great conversation to have recorded in our dev mailing list or in JIRA, instead of lost in PR comments on github...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r157063770
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        val stageId = TaskContext.get().stageId()
    +        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
    --- End diff --
    
    Ever thought of returning why the commit was refused, e.g: unknown stage ID vs other task attempt committed, vs you ar considered failed?
    Not that the task committer should behave differently, but it might be nice to pass that info back for logging alone


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84321 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84321/testReport)** for PR 19848 at commit [`a6109e4`](https://github.com/apache/spark/commit/a6109e416916178a2612eeb70cf2482244c146ed).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @rezasafi That is equivalent to two different executions of the same/similar app (concurrently or sequentially) right ?
    If yes, that is something @steveloughran already covered above and does not require special handling imo.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Merging to master / 2.2.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84380 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84380/testReport)** for PR 19848 at commit [`8e68683`](https://github.com/apache/spark/commit/8e6868395e9291ad8ddf2a0251f7e6163618b132).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    > I was hoping you would know the hadoop committer semantics better than me
    
    I might, but that's only because I spent time with a debugger and asking people the history of things, which is essentially an oral folklore of "how things failed". Suffice to say: the google paper left some important details out. 
    
    Best public documentation, [Task Committers](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md). I'm doing a proper paper on it, but it's not ready yet.
    
    * Every Job attempt must have the same JobID. That's for both cleanup and for  `FIleOutputCommiter` recovery. Less relevant for Spark as it doesn't do driver restart.
    * Job ID must be the same on driver and executor, as that's how the driver knows where to look for output.
    * All attempts to work on the same data part must have the same JobID, TaskID and different TaskAttemptID. That's critical to deal with task commit concurrency resolution, specifically the failure "task attempt 1 given permission to commit, does the commit, doesn't return, TA 2 kicked off" Having the same task Id and some guarantee in the commit protocol to overwrite any existing attempt guarantees that only one attempt's output is committed by the job, even if two task attempts actually commit their work
    * V1 job recovery expects task & job attempt IDs to be sequential on a task, used to find out what the working dir of previous attempts would be, and assumes that if attemptID==0, there's no previous attempt to recover
    * No expectation of taskID uniqueness across jobs.
    
    I don't know about broader uniqueness of things like across a full filesystem. Could matter if people were playing with temporary paths, but the convention to put everything under `$dest/_temporary/$jobAttemptId` means that you only need uniqueness amongst all jobs writing to the same destination path
    
    * S3A committers want unique paths to put things, staging committer: in HDFS, local FS; Magic, 
    * Stocator expects the job ID to be unique through the job, again, doesn't care about global uniqueness
    * I don't know about committers to other destinations than HDFS or S3
    
    One use case to consider, and @rdblue will have opinions there, is >1 job doing an append write to the same destination path. Every jobID must be unique enough to guarantee that the two independent jobs (even in different spark clusters) must be able to have their own intermediate datasets not conflict, even when created with the same parent dir
    
    w.r.t Spark,. StageID => Job ID everywhere is needed for both sides of the committer to be consistent
    * and have a unique stage ID will potentially line you up for interesting things later. Put differently: if its easy enough to be unique, why wouldn't you.
    * Side issue: the hadoop code to parse task, job, attempt ID code is pretty brittle. Tread carefully, never call toString() on them in a log statement,


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r157111981
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
    @@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging {
           config: HadoopWriteConfigUtil[K, V]): Unit = {
         // Extract context and configuration from RDD.
         val sparkContext = rdd.context
    -    val stageId = rdd.id
    +    val commitJobId = rdd.id
     
         // Set up a job.
         val jobTrackerId = createJobTrackerID(new Date())
    --- End diff --
    
    `jobTrackerId` is also not unique, is that OK?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    i dunno what the requirements are -- I was hoping you would know the hadoop committer semantics better than me!  I suppose a uuid is really the only get something globally unique, as you could even have multiple independent spark contexts.  I have seen a committer creating a temp directory based on the ID, so you could end up with a collision with them both writing to the same dir.
    
    anyway, I'm willling to set this aside as a rare case, the fix here is still a huge improvement.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84362 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84362/testReport)** for PR 19848 at commit [`92f9180`](https://github.com/apache/spark/commit/92f9180b0fea71ff3ae4aa3049e04d6f1e3167be).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84313/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84321 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84321/testReport)** for PR 19848 at commit [`a6109e4`](https://github.com/apache/spark/commit/a6109e416916178a2612eeb70cf2482244c146ed).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @vanzin , @mridulm , @jiangxb1987 let me know if you have any comment here. Thank you in advance. I appreciate it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154256205
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        val stageId = TaskContext.get().stageId()
    +        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
    --- End diff --
    
     Removing jobId from the signature of commitTask will cause a binary incompatibility error, since commitTask here is a public method. So we will ended up with a parameter that will stay unused. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154235818
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        val stageId = TaskContext.get().stageId()
    +        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
    --- End diff --
    
    Shouldn't `CommitDeniedException` (below) be updated to use the stage ID also? Otherwise the exception might have incomplete information.
    
    With that change it's possible that `jobId` might become unused in this method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154157813
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +79,14 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        var canCommit: Boolean = true
    +        // This checks whether the commitTask provided by stageId, which if not the canCommit
    +        // will use jobId as stageId to decide whether the commit should be possible
    +        if (stageId != -1) {
    --- End diff --
    
    In which case would this happen? Would it be hard to change the API so that the stage id is always provided to `commitTask`?
    
    Mridul suggested in the previous PR to use the MR job configuration to propagate this (which you can access in the `mrTaskContext` parameter above). Any reason why you didn't go that route?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    **[Test build #84362 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84362/testReport)** for PR 19848 at commit [`92f9180`](https://github.com/apache/spark/commit/92f9180b0fea71ff3ae4aa3049e04d6f1e3167be).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @steveloughran Any thoughts on @squito's comment ? It might be a valid corner case some committer might be leveraging ? (in context of a single user session for example)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154158671
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
    @@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
         pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
       }
     
    +  test("The JobId on driver and executor should be the same during the commit") {
    +    // Create more than one rdd to mimic stageId not equal to rddId
    +    val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2).
    +      map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }.filter { p => p._1 > 0 }
    +    pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
    +  }
    --- End diff --
    
    Add `assert(JobID.jobid != -1)` to make sure the test code is actually running.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Thank you very much, @vanzin. I changed the code per your comment and pushed the changes. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154137939
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
    @@ -106,6 +106,12 @@ abstract class FileCommitProtocol {
        */
       def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
     
    +  /**
    +   * Commits a task which blongs to a specific stage after the writes succeed.
    +   * Must be called on the executors when running tasks.
    +   */
    +  private[spark] def commitTask(taskContext: TaskAttemptContext, stageId: Int): TaskCommitMessage
    --- End diff --
    
    These classes are in an `.internal` package so there's no need for `private[spark]`. Or at least that's what I see in the sbt build scripts.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Thought some more on this.
    
    Here's a possible workflow for failures which can arise from job attempt recycling
    
    1. Stage 1, Job ID 0, attempt 1, kicks off task 0 attempt 1, task attempt ID = 0_1_0_1
    1. task attempt ID = 0_1_0_1 drops off the network, pauses for a long time for GC, etc
    1. Job A gives up on attempt 0, kicks off second attempt, task attempt ID 0_1_0_2
    1. task attempt ID 0_1_0_2 finishes, asks for commit permission, commits.
    1. Stage 1 completes, stage 2 kicks off
    1. Stage 2, Job ID 0, attempt 1, kicks off task 0 attempt 1, task attempt ID = 0_1_0_1
    1. Original task attempt 0_1_0_1 finishes, asks driver for permission to commit.
    1. `OutputCommitCoordinator.handleAskPermissionToCommit()` gets that request.
    1. Which it (fortunately) rejects, as ignoring task attempt ID, stage number is different
    1. But it will send back false to the task Attempt
    1. Which will cleanup
    1. including deleting _temporary/job_0_1/task_0_1_0_1
    1. Which the stage 2 task attempt task attempt ID = 0_1_0_1 was using to write stuff
    1. As that no longer exists, `needsTaskCommit()` will return false
    1. so that task attempt will silently lose its output.
    
    It's a convoluted failure mode as it depends on the same dest dir being used, and timings of things, but it does imply that by recycling job IDs, a partitioned task from a stage can contaminate the next one.
    
    That's across stages. Between jobs, you could have the same workflow. Indeed, if the entire spark cluster went away the partitioned task attempt would see a failure and react by deleting its attempt data.
    
    Conclusion: you really need job attempts which are unique across executions, even sequential ones, unless you can guarantee that all task attempts from the previous executions are are terminated.
    
    
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19848


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    I found Spark SQL always use 0 as job id...  How hadoop committers work with job id? only for recovery?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r157067651
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        val stageId = TaskContext.get().stageId()
    +        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
    --- End diff --
    
    Thanks @steveloughran for the coment. The reason will be logged in method handleAskPermissionToCommit in org.apache.spark.scheduler.OutputCommitCoordinator.scala


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    WiP: [a_zero_rename_committer.pdf](https://github.com/steveloughran/zero-rename-committer/files/1604894/a_zero_rename_committer.pdf)
    
    I would really like some early review of the spark-side of that commit algorithm, in the overall coverage & UML, and the deeper bits. I'm currently unsure how the OutputCommitCoordinator gets told of failures of Executors/task attempts, and I worry it has an expectation "you can reattempt to commit any task where the committed attempt fails during task commit". I don't think that guaranteed with FileOutputCommitter v2, which is a fact which the committer itself doesn't declare to the callers. Could fix that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    LGTM but I'll leave it here a bit for others to take a look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84380/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154156530
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
    @@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
         pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
       }
     
    +  test("The JobId on driver and executor should be the same during the commit") {
    +    // Create more than one rdd to mimic stageId not equal to rddId
    +    val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2).
    +      map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }.filter { p => p._1 > 0 }
    --- End diff --
    
    nit: move `.filter` to next line for readability.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r157068238
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
    @@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
       }
     }
     
    +class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
    +  def setupJob(j: NewJobContext): Unit = {
    +    JobID.jobid = j.getJobID().getId
    +  }
    +
    +  def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
    +
    +  def setupTask(t: NewTaskAttempContext): Unit = {
    +    val jobId = t.getTaskAttemptID().getJobID().getId
    +    assert(jobId === JobID.jobid)
    +  }
    +
    +  def commitTask(t: NewTaskAttempContext): Unit = {}
    +
    +  def abortTask(t: NewTaskAttempContext): Unit = {}
    +}
    +
    +class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
    +
    +  def checkOutputSpecs(j: NewJobContext): Unit = {}
    +
    +  def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
    +    new NewFakeWriter()
    +  }
    +
    +  def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
    +    new YetAnotherFakeCommitter()
    +  }
    +}
    +
    +object JobID {
    +  var jobid = -1
    --- End diff --
    
    @steveloughran, this isn't used by committers. It is just used for storing the jobId. For test purposes here I used -1  to just make sure this variable is not set.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    +1 to Marcelo's comment about having this conversation somewhere archived.
    
    I actually feel like this is something hadoop should be documenting ... we are talking about how committers we happen to know work, rather than talking about the general contract of committers.  But even if its not in the hadoop docs, in our jira or mailing list would be better.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Thanks for fixing this @rezasafi !
    This looks cleaner than my suggestion to generate a unique jobId. LGTM @vanzin 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Thank you very much @vanzin, @mridulm and @jiangxb1987. I really appreciate it. I will create PR for branch 2.2 ASAP.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r157110808
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    --- End diff --
    
     the `jobId` parameter is not used now


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    @steveloughran Thank you very much for your detailed comment. I really appreciate it. I think In the above list when you reach step 6, for Stage2 you will have a different JobId and it cannot be zero considering the current fix. That is because the JobId is rdd.id and in the spark context you will have a new rddId for each new rdd (nextRddId.getAndIncrement()). 
    Across different executions (with different SparkContexts) we may hit the same jobId using this fix. What I understand from your detailed analysis, to resolve that we have two options:
    1) Check if the same jobId already is committed and then remove existing files and commit again.
    2) Use a UUID and each time create a new unique jobId even across different executions.
    Option 2 can be problematic since we may not want to have different copies of an rdd at different times. We probably just want the latest one. So maybe the first option is better.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19848#discussion_r154168397
  
    --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
    @@ -70,7 +79,14 @@ object SparkHadoopMapRedUtil extends Logging {
           if (shouldCoordinateWithDriver) {
             val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
             val taskAttemptNumber = TaskContext.get().attemptNumber()
    -        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
    +        var canCommit: Boolean = true
    +        // This checks whether the commitTask provided by stageId, which if not the canCommit
    +        // will use jobId as stageId to decide whether the commit should be possible
    +        if (stageId != -1) {
    --- End diff --
    
    Thank you very much, Marcelo. I had a misunderstanding about Mirdul's comment. We can get stageId from TaskContext.get as he suggested there. That way everything is much easier. I will update this PR soon after finish testing. Thank you again. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84362/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19848
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org