You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ivoson <gi...@git.apache.org> on 2018/01/12 04:51:44 UTC

[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

GitHub user ivoson opened a pull request:

    https://github.com/apache/spark/pull/20244

    [SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

    …d is the same when calculate taskSerialization and task partitions
    
    Change-Id: Ib9839ca552653343d264135c116742effa6feb60
    
    ## What changes were proposed in this pull request?
    
    When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [MapWithStateRDD](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
    
    ## How was this patch tested?
    
    the exist uts and also add a test case in DAGScheduerSuite to show the exception case.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ivoson/spark branch-taskpart-mistype

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20244
    
----
commit 0dea573e9e724d591803b73f678e14f94e0af447
Author: huangtengfei <hu...@...>
Date:   2018-01-12T02:53:29Z

    submitMissingTasks should make sure the checkpoint status of stage.rdd is the same when calculate taskSerialization and task partitions
    
    Change-Id: Ib9839ca552653343d264135c116742effa6feb60

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165763274
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    --- End diff --
    
    and then this would be another semaphore `checkpointStateUpdated`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r161141499
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -96,6 +98,22 @@ class MyRDD(
       override def toString: String = "DAGSchedulerSuiteRDD " + id
     }
     
    +/** Wrapped rdd partition. */
    +class WrappedPartition(val partition: Partition) extends Partition {
    +  def index: Int = partition.index
    +}
    +
    +/** Wrapped rdd with WrappedPartition. */
    +class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
    +  protected def getPartitions: Array[Partition] = {
    +    parent.partitions.map(p => new WrappedPartition(p))
    +  }
    +
    +  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
    +    parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
    --- End diff --
    
    I think this line is the key point for `WrppedPartition` and `WrappedRDD`, please give comments for explaining your intention.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165763018
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    --- End diff --
    
    this would be a bit easier to follow if you rename your semaphores a bit.
    
    `semaphore1` -> `doCheckpointStarted`
    `semaphore2` -> `taskBinaryBytesFinished`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    merged to master / 2.3 / 2.2
    
    I hit a merge conflict trying to merge to 2.1 -- feel free to open another PR for that version.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r161144809
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("task part misType with checkpoint rdd in concurrent execution scenes") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    val latch = new CountDownLatch(2)
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished.
    +        semaphore1.release()
    +        semaphore2.acquire()
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +
    +        latch.countDown()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +        semaphore1.acquire()
    +        // Simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        semaphore2.release()
    +        semaphore1.acquire()
    +        // Part calculated with rdd checkpoint already finished.
    +        val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)](
    +          ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader)
    +        val part = rdd.partitions(0)
    +        intercept[ClassCastException] {
    --- End diff --
    
    I think this not a "test", this just a "reproduce" for the problem you want to fix. We should prove your code added in `DAGScheduler.scala` can fix that problem and with the original code base, a `ClassCastException` raised.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166387840
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1016,15 +1016,23 @@ class DAGScheduler(
         // might modify state of objects referenced in their closures. This is necessary in Hadoop
         // where the JobConf/Configuration object is not thread-safe.
         var taskBinary: Broadcast[Array[Byte]] = null
    +    var partitions: Array[Partition] = null
         try {
           // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
           // For ResultTask, serialize and broadcast (rdd, func).
    -      val taskBinaryBytes: Array[Byte] = stage match {
    -        case stage: ShuffleMapStage =>
    -          JavaUtils.bufferToArray(
    -            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
    -        case stage: ResultStage =>
    -          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
    +      var taskBinaryBytes: Array[Byte] = null
    +      // Add synchronized block to avoid rdd deserialized from taskBinaryBytes has diff checkpoint
    +      // status with the rdd when create ShuffleMapTask or ResultTask.
    --- End diff --
    
    thanks for the advise.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    @ivoson Tengfei, please post the full stack trace of the `ClassCastException`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    reopen this...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r167145734
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    --- End diff --
    
    hi @squito , it's fine. The pr and jira have been updated. Thanks for your patient and review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165764166
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        // Wait until doCheckpoint job running finished, but checkpoint status not changed.
    +        semaphore1.acquire()
    +
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +
    +        // Simply simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        // Because partition calculate is in a synchronized block, so in the fixed code
    +        // partition is calculated here.
    +        val correctPart = rdd.partitions(0)
    +
    +        // Release semaphore2 so changing checkpoint status to Checkpointed will be done in
    +        // checkpointThread.
    +        semaphore2.release()
    +        // Wait until checkpoint status changed to Checkpointed in checkpointThread.
    +        semaphore1.acquire()
    +
    +        // Part calculated with rdd checkpoint already finished.
    +        val errPart = rdd.partitions(0)
    +
    +        // TaskBinary will be deserialized when run task in executor.
    +        val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)](
    +          ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader)
    +
    +        val taskContext = mock(classOf[TaskContext])
    +        doNothing().when(taskContext).killTaskIfInterrupted()
    +
    +        // ClassCastException is expected with errPart.
    --- End diff --
    
    I think this is a bit easier to follow if you say
    
    Make sure our test case is setup correctly -- we expect a ClassCastException here if we use the `rdd.partitions` *after* checkpointing was done, but our binary bytes is from before it finished.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166385409
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        // Wait until doCheckpoint job running finished, but checkpoint status not changed.
    +        semaphore1.acquire()
    +
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +
    +        // Simply simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        // Because partition calculate is in a synchronized block, so in the fixed code
    +        // partition is calculated here.
    +        val correctPart = rdd.partitions(0)
    +
    +        // Release semaphore2 so changing checkpoint status to Checkpointed will be done in
    +        // checkpointThread.
    +        semaphore2.release()
    +        // Wait until checkpoint status changed to Checkpointed in checkpointThread.
    +        semaphore1.acquire()
    +
    +        // Part calculated with rdd checkpoint already finished.
    +        val errPart = rdd.partitions(0)
    +
    +        // TaskBinary will be deserialized when run task in executor.
    +        val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)](
    +          ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader)
    +
    +        val taskContext = mock(classOf[TaskContext])
    +        doNothing().when(taskContext).killTaskIfInterrupted()
    +
    +        // ClassCastException is expected with errPart.
    +        intercept[ClassCastException] {
    +          // Triggered when runTask in executor.
    +          taskRdd.iterator(errPart, taskContext)
    +        }
    +
    +        // Execute successfully with correctPart.
    +        taskRdd.iterator(correctPart, taskContext)
    +      }
    +    }
    +
    +    new Thread(checkpointRunnable).start()
    +    val submitMissingTasksThread = new Thread(submitMissingTasksRunnable)
    +    submitMissingTasksThread.start()
    +    submitMissingTasksThread.join()
    +
    +    Utils.deleteRecursively(tempDir)
    --- End diff --
    
    will fix this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r167138603
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    --- End diff --
    
    hi @ivoson -- I haven't come up with a better way to test this, so I think for now you should
    
    (1) change the PR to *only* include the changes to the DAGScheduler (also undo the `protected[spark]` changes elsewhere)
    (2) put this repro on the jira as its a pretty good for showing whats going on.
    
    if we come up with a way to test it, we can always do that later on.
    
    thanks and sorry for the back and forth


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165759800
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1016,15 +1016,23 @@ class DAGScheduler(
         // might modify state of objects referenced in their closures. This is necessary in Hadoop
         // where the JobConf/Configuration object is not thread-safe.
         var taskBinary: Broadcast[Array[Byte]] = null
    +    var partitions: Array[Partition] = null
         try {
           // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
           // For ResultTask, serialize and broadcast (rdd, func).
    -      val taskBinaryBytes: Array[Byte] = stage match {
    -        case stage: ShuffleMapStage =>
    -          JavaUtils.bufferToArray(
    -            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
    -        case stage: ResultStage =>
    -          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
    +      var taskBinaryBytes: Array[Byte] = null
    +      // Add synchronized block to avoid rdd deserialized from taskBinaryBytes has diff checkpoint
    +      // status with the rdd when create ShuffleMapTask or ResultTask.
    --- End diff --
    
    I'd reword this a bit:
    
    taskBinaryBytes and partitions are both effected by the checkpoint status.  We need this synchronization in case another concurrent job is checkpointing this RDD, so we get a consistent view of both variables.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166386639
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    --- End diff --
    
    thanks for the advise. will fix this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    This is the stack trace of the Exception.
    
    ```
    java.lang.ClassCastException: org.apache.spark.rdd.CheckpointRDDPartition cannot be cast to org.apache.spark.streaming.rdd.MapWithStateRDDPartition
    at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:152)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r161141879
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("task part misType with checkpoint rdd in concurrent execution scenes") {
    --- End diff --
    
    maybe "SPARK-23053: avoid CastException in concurrent execution with checkpoint" better?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166387660
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        // Wait until doCheckpoint job running finished, but checkpoint status not changed.
    +        semaphore1.acquire()
    +
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +
    +        // Simply simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        // Because partition calculate is in a synchronized block, so in the fixed code
    +        // partition is calculated here.
    +        val correctPart = rdd.partitions(0)
    +
    +        // Release semaphore2 so changing checkpoint status to Checkpointed will be done in
    +        // checkpointThread.
    +        semaphore2.release()
    +        // Wait until checkpoint status changed to Checkpointed in checkpointThread.
    +        semaphore1.acquire()
    +
    +        // Part calculated with rdd checkpoint already finished.
    +        val errPart = rdd.partitions(0)
    +
    +        // TaskBinary will be deserialized when run task in executor.
    +        val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)](
    +          ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader)
    +
    +        val taskContext = mock(classOf[TaskContext])
    +        doNothing().when(taskContext).killTaskIfInterrupted()
    +
    +        // ClassCastException is expected with errPart.
    --- End diff --
    
    thanks for the advise, it is really helpful for understanding, will update this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r161145542
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("task part misType with checkpoint rdd in concurrent execution scenes") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    val latch = new CountDownLatch(2)
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished.
    +        semaphore1.release()
    +        semaphore2.acquire()
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +
    +        latch.countDown()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +        semaphore1.acquire()
    +        // Simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        semaphore2.release()
    +        semaphore1.acquire()
    +        // Part calculated with rdd checkpoint already finished.
    +        val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)](
    +          ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader)
    +        val part = rdd.partitions(0)
    +        intercept[ClassCastException] {
    --- End diff --
    
    it is a reproduce case, i will fix this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
GitHub user ivoson reopened a pull request:

    https://github.com/apache/spark/pull/20244

    [SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

    …d is the same when calculate taskSerialization and task partitions
    
    Change-Id: Ib9839ca552653343d264135c116742effa6feb60
    
    ## What changes were proposed in this pull request?
    
    When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [MapWithStateRDD](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
    
    ## How was this patch tested?
    
    the exist uts and also add a test case in DAGScheduerSuite to show the exception case.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ivoson/spark branch-taskpart-mistype

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20244
    
----
commit 0dea573e9e724d591803b73f678e14f94e0af447
Author: huangtengfei <hu...@...>
Date:   2018-01-12T02:53:29Z

    submitMissingTasks should make sure the checkpoint status of stage.rdd is the same when calculate taskSerialization and task partitions
    
    Change-Id: Ib9839ca552653343d264135c116742effa6feb60

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166386259
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    --- End diff --
    
    will fix this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165761754
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    --- End diff --
    
    I'd remove "simply" here and elsewhere in comments.  Also "do" -> "does"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    test this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166385020
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    --- End diff --
    
    check the code again and yes ```checkpointDir = Utils.createTempDir()``` is enough for this case, will fix this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165761207
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    --- End diff --
    
    why do you make a tempfile for the checkpoint dir and then delete it?  why not just  `checkpointDir = new File(tempDir, "checkpointing")`?  Or even just `checkpointDir = Utils.createTempDir()`?
    
    (CheckpointSuite does this so it can call `sc.setCheckpointDir`, but you're not doing that here)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r161145538
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("task part misType with checkpoint rdd in concurrent execution scenes") {
    --- End diff --
    
    thanks for the suggest.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166502288
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    --- End diff --
    
    @squito thanks for reply. I understand this, technically it may not be a UT case, just simulate the scene with exception. I also wonder if there is a good way to test this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166387716
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        // Wait until doCheckpoint job running finished, but checkpoint status not changed.
    +        semaphore1.acquire()
    +
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +
    +        // Simply simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        // Because partition calculate is in a synchronized block, so in the fixed code
    +        // partition is calculated here.
    +        val correctPart = rdd.partitions(0)
    +
    +        // Release semaphore2 so changing checkpoint status to Checkpointed will be done in
    +        // checkpointThread.
    +        semaphore2.release()
    +        // Wait until checkpoint status changed to Checkpointed in checkpointThread.
    +        semaphore1.acquire()
    +
    +        // Part calculated with rdd checkpoint already finished.
    --- End diff --
    
    thanks for the advise, it is really helpful for understanding, will update this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson closed the pull request at:

    https://github.com/apache/spark/pull/20244


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    thank you for reviewing this @squito 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20244


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r161145547
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -96,6 +98,22 @@ class MyRDD(
       override def toString: String = "DAGSchedulerSuiteRDD " + id
     }
     
    +/** Wrapped rdd partition. */
    +class WrappedPartition(val partition: Partition) extends Partition {
    +  def index: Int = partition.index
    +}
    +
    +/** Wrapped rdd with WrappedPartition. */
    +class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
    +  protected def getPartitions: Array[Partition] = {
    +    parent.partitions.map(p => new WrappedPartition(p))
    +  }
    +
    +  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
    +    parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
    --- End diff --
    
    thanks for the comment, i will work on this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166458357
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    --- End diff --
    
    hi @ivoson  -- I'm really sorry but I only just realized that this "test" is really just a repro, and it passes both before and after the actual code changes, since you've replicated the internal logic we're fixing.  As such, I don't think its actually useful as a test case -- perhaps it should get added to the jira as a repro.
    
    I appreciate the work that went into writing this as it helped make the issue clear to me.  I am not sure if there is a good way to test this.  If we can't come up with anything, we should just commit your actual fix, but give me a day or two to think about it ...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    @xuanyuanking could review this please?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165763669
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        // Wait until doCheckpoint job running finished, but checkpoint status not changed.
    +        semaphore1.acquire()
    +
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +
    +        // Simply simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        // Because partition calculate is in a synchronized block, so in the fixed code
    +        // partition is calculated here.
    +        val correctPart = rdd.partitions(0)
    +
    +        // Release semaphore2 so changing checkpoint status to Checkpointed will be done in
    +        // checkpointThread.
    +        semaphore2.release()
    +        // Wait until checkpoint status changed to Checkpointed in checkpointThread.
    +        semaphore1.acquire()
    +
    +        // Part calculated with rdd checkpoint already finished.
    --- End diff --
    
    I'd add a comment above this:
    
    Now we're done simulating the interleaving that might happen within the scheduler -- we'll check to make sure the final state is OK by simulating a couple steps that normally happen on the executor.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165764342
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    +      }
    +    }
    +
    +    val submitMissingTasksRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate the process of submitMissingTasks.
    +        // Wait until doCheckpoint job running finished, but checkpoint status not changed.
    +        semaphore1.acquire()
    +
    +        val ser = SparkEnv.get.closureSerializer.newInstance()
    +
    +        // Simply simulate task serialization while submitMissingTasks.
    +        // Task serialized with rdd checkpoint not finished.
    +        val cleanedFunc = sc.clean(Utils.getIteratorSize _)
    +        val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
    +        val taskBinaryBytes = JavaUtils.bufferToArray(
    +          ser.serialize((rdd, func): AnyRef))
    +        // Because partition calculate is in a synchronized block, so in the fixed code
    +        // partition is calculated here.
    +        val correctPart = rdd.partitions(0)
    +
    +        // Release semaphore2 so changing checkpoint status to Checkpointed will be done in
    +        // checkpointThread.
    +        semaphore2.release()
    +        // Wait until checkpoint status changed to Checkpointed in checkpointThread.
    +        semaphore1.acquire()
    +
    +        // Part calculated with rdd checkpoint already finished.
    +        val errPart = rdd.partitions(0)
    +
    +        // TaskBinary will be deserialized when run task in executor.
    +        val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)](
    +          ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader)
    +
    +        val taskContext = mock(classOf[TaskContext])
    +        doNothing().when(taskContext).killTaskIfInterrupted()
    +
    +        // ClassCastException is expected with errPart.
    +        intercept[ClassCastException] {
    +          // Triggered when runTask in executor.
    +          taskRdd.iterator(errPart, taskContext)
    +        }
    +
    +        // Execute successfully with correctPart.
    +        taskRdd.iterator(correctPart, taskContext)
    +      }
    +    }
    +
    +    new Thread(checkpointRunnable).start()
    +    val submitMissingTasksThread = new Thread(submitMissingTasksRunnable)
    +    submitMissingTasksThread.start()
    +    submitMissingTasksThread.join()
    +
    +    Utils.deleteRecursively(tempDir)
    --- End diff --
    
    this should be done in a `finally`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r166386624
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * In this test, we simply simulate the scene in concurrent jobs using the same
    +   * rdd which is marked to do checkpoint:
    +   * Job one has already finished the spark job, and start the process of doCheckpoint;
    +   * Job two is submitted, and submitMissingTasks is called.
    +   * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done,
    +   * while part calculates from stage.rdd.partitions is called after doCheckpoint is done,
    +   * we may get a ClassCastException when execute the task because of some rdd will do
    +   * Partition cast.
    +   *
    +   * With this test case, just want to indicate that we should do taskSerialization and
    +   * part calculate in submitMissingTasks with the same rdd checkpoint status.
    +   */
    +  test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") {
    +    // set checkpointDir.
    +    val tempDir = Utils.createTempDir()
    +    val checkpointDir = File.createTempFile("temp", "", tempDir)
    +    checkpointDir.delete()
    +    sc.setCheckpointDir(checkpointDir.toString)
    +
    +    // Semaphores to control the process sequence for the two threads below.
    +    val semaphore1 = new Semaphore(0)
    +    val semaphore2 = new Semaphore(0)
    +
    +    val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
    +    rdd.checkpoint()
    +
    +    val checkpointRunnable = new Runnable {
    +      override def run() = {
    +        // Simply simulate what RDD.doCheckpoint() do here.
    +        rdd.doCheckpointCalled = true
    +        val checkpointData = rdd.checkpointData.get
    +        RDDCheckpointData.synchronized {
    +          if (checkpointData.cpState == CheckpointState.Initialized) {
    +            checkpointData.cpState = CheckpointState.CheckpointingInProgress
    +          }
    +        }
    +
    +        val newRDD = checkpointData.doCheckpoint()
    +
    +        // Release semaphore1 after job triggered in checkpoint finished, so that taskBinary
    +        // serialization can start.
    +        semaphore1.release()
    +        // Wait until taskBinary serialization finished in submitMissingTasksThread.
    +        semaphore2.acquire()
    +
    +        // Update our state and truncate the RDD lineage.
    +        RDDCheckpointData.synchronized {
    +          checkpointData.cpRDD = Some(newRDD)
    +          checkpointData.cpState = CheckpointState.Checkpointed
    +          rdd.markCheckpointed()
    +        }
    +        semaphore1.release()
    --- End diff --
    
    thanks for the advise


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

Posted by ivoson <gi...@git.apache.org>.
Github user ivoson commented on the issue:

    https://github.com/apache/spark/pull/20244
  
    @squito Hi Rashid, thanks for your review and advise. The PR description and JIRA have been updated , and also put the stack trace on the JIRA. 
    The last commit addresses the comments you left. Thanks again for the advise.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org