You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2018/08/10 00:12:12 UTC

[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/22062

    [SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access released memory page

    ## What changes were proposed in this pull request?
    
    This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). 
    
    "allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I  have seen:
    
    - JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address)
    - java.lang.IllegalArgumentException: Comparison method violates its general contract!
    - java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
    - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632
    
    This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue.
    
    ## How was this patch tested?
    
    The new unit test will make JVM crash without the fix.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark SPARK-25081

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22062.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22062
    
----
commit 54799cae8ef0727988bbb863d326ea61b4d9ae72
Author: Shixiong Zhu <zs...@...>
Date:   2018-08-10T00:02:33Z

    Nested spill in ShuffleExternalSorter should not access released memory page

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22062#discussion_r209372979
  
    --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle.sort
    +
    +import java.lang.{Long => JLong}
    +
    +import org.mockito.Mockito.when
    +import org.scalatest.mockito.MockitoSugar
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
    +import org.apache.spark.memory._
    +import org.apache.spark.unsafe.Platform
    +
    +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
    +
    +  test("nested spill should be no-op") {
    +    val conf = new SparkConf()
    +      .setMaster("local[1]")
    +      .setAppName("ShuffleExternalSorterSuite")
    +      .set("spark.testing", "true")
    +      .set("spark.testing.memory", "1600")
    +      .set("spark.memory.fraction", "1")
    +    sc = new SparkContext(conf)
    +
    +    val memoryManager = UnifiedMemoryManager(conf, 1)
    +
    +    var shouldAllocate = false
    +
    +    // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
    +    // This will trigger a nested spill and expose issues if we don't handle this case properly.
    +    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
    +      override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
    +        // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
    +        // So we leave 400 bytes for the task.
    +        if (shouldAllocate &&
    +          memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
    +          val acquireExecutionMemoryMethod =
    +            memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
    +          acquireExecutionMemoryMethod.invoke(
    +            memoryManager,
    +            JLong.valueOf(
    +              memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
    +            JLong.valueOf(1L), // taskAttemptId
    +            MemoryMode.ON_HEAP
    +          ).asInstanceOf[java.lang.Long]
    +        }
    +        super.acquireExecutionMemory(required, consumer)
    +      }
    +    }
    +    val taskContext = mock[TaskContext]
    --- End diff --
    
    lol


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    **[Test build #94529 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94529/testReport)** for PR 22062 at commit [`54799ca`](https://github.com/apache/spark/commit/54799cae8ef0727988bbb863d326ea61b4d9ae72).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22062#discussion_r209338026
  
    --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle.sort
    +
    +import java.lang.{Long => JLong}
    +
    +import org.mockito.Mockito.when
    +import org.scalatest.mockito.MockitoSugar
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
    +import org.apache.spark.memory._
    +import org.apache.spark.unsafe.Platform
    +
    +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
    +
    +  test("nested spill should be no-op") {
    +    val conf = new SparkConf()
    +      .setMaster("local[1]")
    +      .setAppName("ShuffleExternalSorterSuite")
    +      .set("spark.testing", "true")
    +      .set("spark.testing.memory", "1600")
    +      .set("spark.memory.fraction", "1")
    +    sc = new SparkContext(conf)
    +
    +    val memoryManager = UnifiedMemoryManager(conf, 1)
    +
    +    var shouldAllocate = false
    +
    +    // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
    +    // This will trigger a nested spill and expose issues if we don't handle this case properly.
    +    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
    +      override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
    +        // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
    +        // So we leave 400 bytes for the task.
    +        if (shouldAllocate &&
    +          memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
    +          val acquireExecutionMemoryMethod =
    +            memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
    +          acquireExecutionMemoryMethod.invoke(
    +            memoryManager,
    +            JLong.valueOf(
    +              memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
    +            JLong.valueOf(1L), // taskAttemptId
    +            MemoryMode.ON_HEAP
    +          ).asInstanceOf[java.lang.Long]
    +        }
    +        super.acquireExecutionMemory(required, consumer)
    +      }
    +    }
    +    val taskContext = mock[TaskContext]
    +    val taskMetrics = new TaskMetrics
    +    when(taskContext.taskMetrics()).thenReturn(taskMetrics)
    +    val sorter = new ShuffleExternalSorter(
    +      taskMemoryManager,
    +      sc.env.blockManager,
    +      taskContext,
    +      100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes
    +      1, // numPartitions
    +      conf,
    +      new ShuffleWriteMetrics)
    +    val inMemSorter = {
    +      val field = sorter.getClass.getDeclaredField("inMemSorter")
    +      field.setAccessible(true)
    +      field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
    +    }
    +    // Allocate memory to make the next "insertRecord" call triggers a spill.
    +    val bytes = new Array[Byte](1)
    +    while (inMemSorter.hasSpaceForAnotherRecord) {
    --- End diff --
    
    > Access to the hasSpaceForAnotherRecord is the only reason why we need reflection right?
    
    Yes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22062#discussion_r209292284
  
    --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle.sort
    +
    +import java.lang.{Long => JLong}
    +
    +import org.mockito.Mockito.when
    +import org.scalatest.mockito.MockitoSugar
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
    +import org.apache.spark.memory._
    +import org.apache.spark.unsafe.Platform
    +
    +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
    +
    +  test("nested spill should be no-op") {
    +    val conf = new SparkConf()
    +      .setMaster("local[1]")
    +      .setAppName("ShuffleExternalSorterSuite")
    +      .set("spark.testing", "true")
    +      .set("spark.testing.memory", "1600")
    +      .set("spark.memory.fraction", "1")
    +    sc = new SparkContext(conf)
    +
    +    val memoryManager = UnifiedMemoryManager(conf, 1)
    +
    +    var shouldAllocate = false
    +
    +    // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
    +    // This will trigger a nested spill and expose issues if we don't handle this case properly.
    +    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
    +      override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
    +        // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
    +        // So we leave 400 bytes for the task.
    +        if (shouldAllocate &&
    +          memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
    +          val acquireExecutionMemoryMethod =
    +            memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
    +          acquireExecutionMemoryMethod.invoke(
    +            memoryManager,
    +            JLong.valueOf(
    +              memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
    +            JLong.valueOf(1L), // taskAttemptId
    +            MemoryMode.ON_HEAP
    +          ).asInstanceOf[java.lang.Long]
    +        }
    +        super.acquireExecutionMemory(required, consumer)
    +      }
    +    }
    +    val taskContext = mock[TaskContext]
    --- End diff --
    
    Do we need mockito here? We can also create a `TaskContextImpl` by hand right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22062#discussion_r209337484
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
    @@ -94,12 +94,20 @@ public int numRecords() {
       }
     
       public void reset() {
    +    // Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
    +    pos = 0;
    --- End diff --
    
    We also need to set `usableCapacity` to `0`. Otherwise, https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java#L343 will not rethrow SparkOutOfMemoryError. ShuffleExternalSorter will keep running and finally touch `array`.
    
    Setting `array` to `null` is just for safety so that anything incorrect use will fail with NPE. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    **[Test build #94529 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94529/testReport)** for PR 22062 at commit [`54799ca`](https://github.com/apache/spark/commit/54799cae8ef0727988bbb863d326ea61b4d9ae72).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    Thanks. Merging to master. I will try to merge to old branches and report back.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    cc @hvanhovell 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    I also merged to branch-2.3.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22062#discussion_r209291439
  
    --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle.sort
    +
    +import java.lang.{Long => JLong}
    +
    +import org.mockito.Mockito.when
    +import org.scalatest.mockito.MockitoSugar
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
    +import org.apache.spark.memory._
    +import org.apache.spark.unsafe.Platform
    +
    +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
    +
    +  test("nested spill should be no-op") {
    +    val conf = new SparkConf()
    +      .setMaster("local[1]")
    +      .setAppName("ShuffleExternalSorterSuite")
    +      .set("spark.testing", "true")
    +      .set("spark.testing.memory", "1600")
    +      .set("spark.memory.fraction", "1")
    +    sc = new SparkContext(conf)
    +
    +    val memoryManager = UnifiedMemoryManager(conf, 1)
    +
    +    var shouldAllocate = false
    +
    +    // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
    +    // This will trigger a nested spill and expose issues if we don't handle this case properly.
    +    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
    +      override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
    +        // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
    +        // So we leave 400 bytes for the task.
    +        if (shouldAllocate &&
    +          memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
    +          val acquireExecutionMemoryMethod =
    +            memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
    +          acquireExecutionMemoryMethod.invoke(
    +            memoryManager,
    +            JLong.valueOf(
    +              memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
    +            JLong.valueOf(1L), // taskAttemptId
    +            MemoryMode.ON_HEAP
    +          ).asInstanceOf[java.lang.Long]
    +        }
    +        super.acquireExecutionMemory(required, consumer)
    +      }
    +    }
    +    val taskContext = mock[TaskContext]
    +    val taskMetrics = new TaskMetrics
    +    when(taskContext.taskMetrics()).thenReturn(taskMetrics)
    +    val sorter = new ShuffleExternalSorter(
    +      taskMemoryManager,
    +      sc.env.blockManager,
    +      taskContext,
    +      100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes
    +      1, // numPartitions
    +      conf,
    +      new ShuffleWriteMetrics)
    +    val inMemSorter = {
    +      val field = sorter.getClass.getDeclaredField("inMemSorter")
    +      field.setAccessible(true)
    +      field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
    +    }
    +    // Allocate memory to make the next "insertRecord" call triggers a spill.
    +    val bytes = new Array[Byte](1)
    +    while (inMemSorter.hasSpaceForAnotherRecord) {
    --- End diff --
    
    Access to the `hasSpaceForAnotherRecord` is the only reason why we need reflection right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2019/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22062
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94529/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22062


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22062#discussion_r209337943
  
    --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.shuffle.sort
    +
    +import java.lang.{Long => JLong}
    +
    +import org.mockito.Mockito.when
    +import org.scalatest.mockito.MockitoSugar
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
    +import org.apache.spark.memory._
    +import org.apache.spark.unsafe.Platform
    +
    +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
    +
    +  test("nested spill should be no-op") {
    +    val conf = new SparkConf()
    +      .setMaster("local[1]")
    +      .setAppName("ShuffleExternalSorterSuite")
    +      .set("spark.testing", "true")
    +      .set("spark.testing.memory", "1600")
    +      .set("spark.memory.fraction", "1")
    +    sc = new SparkContext(conf)
    +
    +    val memoryManager = UnifiedMemoryManager(conf, 1)
    +
    +    var shouldAllocate = false
    +
    +    // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
    +    // This will trigger a nested spill and expose issues if we don't handle this case properly.
    +    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
    +      override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
    +        // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
    +        // So we leave 400 bytes for the task.
    +        if (shouldAllocate &&
    +          memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
    +          val acquireExecutionMemoryMethod =
    +            memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
    +          acquireExecutionMemoryMethod.invoke(
    +            memoryManager,
    +            JLong.valueOf(
    +              memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
    +            JLong.valueOf(1L), // taskAttemptId
    +            MemoryMode.ON_HEAP
    +          ).asInstanceOf[java.lang.Long]
    +        }
    +        super.acquireExecutionMemory(required, consumer)
    +      }
    +    }
    +    val taskContext = mock[TaskContext]
    --- End diff --
    
    > We can also create a TaskContextImpl by hand right?
    
    I can. Just to save several lines :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22062#discussion_r209262151
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
    @@ -94,12 +94,20 @@ public int numRecords() {
       }
     
       public void reset() {
    +    // Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
    +    pos = 0;
    --- End diff --
    
    For my understanding: this is enough to fix the actual issue here right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org