You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2018/08/10 00:12:12 UTC
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
GitHub user zsxwing opened a pull request:
https://github.com/apache/spark/pull/22062
[SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access released memory page
## What changes were proposed in this pull request?
This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907).
"allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I have seen:
- JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its general contract!
- java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632
This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue.
## How was this patch tested?
The new unit test will make JVM crash without the fix.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zsxwing/spark SPARK-25081
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/22062.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22062
----
commit 54799cae8ef0727988bbb863d326ea61b4d9ae72
Author: Shixiong Zhu <zs...@...>
Date: 2018-08-10T00:02:33Z
Nested spill in ShuffleExternalSorter should not access released memory page
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22062
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22062
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/22062#discussion_r209372979
--- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ test("nested spill should be no-op") {
+ val conf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName("ShuffleExternalSorterSuite")
+ .set("spark.testing", "true")
+ .set("spark.testing.memory", "1600")
+ .set("spark.memory.fraction", "1")
+ sc = new SparkContext(conf)
+
+ val memoryManager = UnifiedMemoryManager(conf, 1)
+
+ var shouldAllocate = false
+
+ // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
+ // This will trigger a nested spill and expose issues if we don't handle this case properly.
+ val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+ override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
+ // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
+ // So we leave 400 bytes for the task.
+ if (shouldAllocate &&
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
+ val acquireExecutionMemoryMethod =
+ memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
+ acquireExecutionMemoryMethod.invoke(
+ memoryManager,
+ JLong.valueOf(
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
+ JLong.valueOf(1L), // taskAttemptId
+ MemoryMode.ON_HEAP
+ ).asInstanceOf[java.lang.Long]
+ }
+ super.acquireExecutionMemory(required, consumer)
+ }
+ }
+ val taskContext = mock[TaskContext]
--- End diff --
lol
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22062
**[Test build #94529 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94529/testReport)** for PR 22062 at commit [`54799ca`](https://github.com/apache/spark/commit/54799cae8ef0727988bbb863d326ea61b4d9ae72).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22062#discussion_r209338026
--- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ test("nested spill should be no-op") {
+ val conf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName("ShuffleExternalSorterSuite")
+ .set("spark.testing", "true")
+ .set("spark.testing.memory", "1600")
+ .set("spark.memory.fraction", "1")
+ sc = new SparkContext(conf)
+
+ val memoryManager = UnifiedMemoryManager(conf, 1)
+
+ var shouldAllocate = false
+
+ // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
+ // This will trigger a nested spill and expose issues if we don't handle this case properly.
+ val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+ override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
+ // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
+ // So we leave 400 bytes for the task.
+ if (shouldAllocate &&
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
+ val acquireExecutionMemoryMethod =
+ memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
+ acquireExecutionMemoryMethod.invoke(
+ memoryManager,
+ JLong.valueOf(
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
+ JLong.valueOf(1L), // taskAttemptId
+ MemoryMode.ON_HEAP
+ ).asInstanceOf[java.lang.Long]
+ }
+ super.acquireExecutionMemory(required, consumer)
+ }
+ }
+ val taskContext = mock[TaskContext]
+ val taskMetrics = new TaskMetrics
+ when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+ val sorter = new ShuffleExternalSorter(
+ taskMemoryManager,
+ sc.env.blockManager,
+ taskContext,
+ 100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes
+ 1, // numPartitions
+ conf,
+ new ShuffleWriteMetrics)
+ val inMemSorter = {
+ val field = sorter.getClass.getDeclaredField("inMemSorter")
+ field.setAccessible(true)
+ field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+ }
+ // Allocate memory to make the next "insertRecord" call triggers a spill.
+ val bytes = new Array[Byte](1)
+ while (inMemSorter.hasSpaceForAnotherRecord) {
--- End diff --
> Access to the hasSpaceForAnotherRecord is the only reason why we need reflection right?
Yes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/22062#discussion_r209292284
--- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ test("nested spill should be no-op") {
+ val conf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName("ShuffleExternalSorterSuite")
+ .set("spark.testing", "true")
+ .set("spark.testing.memory", "1600")
+ .set("spark.memory.fraction", "1")
+ sc = new SparkContext(conf)
+
+ val memoryManager = UnifiedMemoryManager(conf, 1)
+
+ var shouldAllocate = false
+
+ // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
+ // This will trigger a nested spill and expose issues if we don't handle this case properly.
+ val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+ override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
+ // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
+ // So we leave 400 bytes for the task.
+ if (shouldAllocate &&
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
+ val acquireExecutionMemoryMethod =
+ memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
+ acquireExecutionMemoryMethod.invoke(
+ memoryManager,
+ JLong.valueOf(
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
+ JLong.valueOf(1L), // taskAttemptId
+ MemoryMode.ON_HEAP
+ ).asInstanceOf[java.lang.Long]
+ }
+ super.acquireExecutionMemory(required, consumer)
+ }
+ }
+ val taskContext = mock[TaskContext]
--- End diff --
Do we need mockito here? We can also create a `TaskContextImpl` by hand right?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22062#discussion_r209337484
--- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -94,12 +94,20 @@ public int numRecords() {
}
public void reset() {
+ // Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
+ pos = 0;
--- End diff --
We also need to set `usableCapacity` to `0`. Otherwise, https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java#L343 will not rethrow SparkOutOfMemoryError. ShuffleExternalSorter will keep running and finally touch `array`.
Setting `array` to `null` is just for safety so that anything incorrect use will fail with NPE.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22062
**[Test build #94529 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94529/testReport)** for PR 22062 at commit [`54799ca`](https://github.com/apache/spark/commit/54799cae8ef0727988bbb863d326ea61b4d9ae72).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22062
Thanks. Merging to master. I will try to merge to old branches and report back.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22062
cc @hvanhovell
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22062
I also merged to branch-2.3.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/22062#discussion_r209291439
--- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ test("nested spill should be no-op") {
+ val conf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName("ShuffleExternalSorterSuite")
+ .set("spark.testing", "true")
+ .set("spark.testing.memory", "1600")
+ .set("spark.memory.fraction", "1")
+ sc = new SparkContext(conf)
+
+ val memoryManager = UnifiedMemoryManager(conf, 1)
+
+ var shouldAllocate = false
+
+ // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
+ // This will trigger a nested spill and expose issues if we don't handle this case properly.
+ val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+ override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
+ // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
+ // So we leave 400 bytes for the task.
+ if (shouldAllocate &&
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
+ val acquireExecutionMemoryMethod =
+ memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
+ acquireExecutionMemoryMethod.invoke(
+ memoryManager,
+ JLong.valueOf(
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
+ JLong.valueOf(1L), // taskAttemptId
+ MemoryMode.ON_HEAP
+ ).asInstanceOf[java.lang.Long]
+ }
+ super.acquireExecutionMemory(required, consumer)
+ }
+ }
+ val taskContext = mock[TaskContext]
+ val taskMetrics = new TaskMetrics
+ when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+ val sorter = new ShuffleExternalSorter(
+ taskMemoryManager,
+ sc.env.blockManager,
+ taskContext,
+ 100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes
+ 1, // numPartitions
+ conf,
+ new ShuffleWriteMetrics)
+ val inMemSorter = {
+ val field = sorter.getClass.getDeclaredField("inMemSorter")
+ field.setAccessible(true)
+ field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+ }
+ // Allocate memory to make the next "insertRecord" call triggers a spill.
+ val bytes = new Array[Byte](1)
+ while (inMemSorter.hasSpaceForAnotherRecord) {
--- End diff --
Access to the `hasSpaceForAnotherRecord` is the only reason why we need reflection right?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22062
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2019/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22062: [SPARK-25081][Core]Nested spill in ShuffleExternalSorter...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22062
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94529/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/22062
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22062#discussion_r209337943
--- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ test("nested spill should be no-op") {
+ val conf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName("ShuffleExternalSorterSuite")
+ .set("spark.testing", "true")
+ .set("spark.testing.memory", "1600")
+ .set("spark.memory.fraction", "1")
+ sc = new SparkContext(conf)
+
+ val memoryManager = UnifiedMemoryManager(conf, 1)
+
+ var shouldAllocate = false
+
+ // Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
+ // This will trigger a nested spill and expose issues if we don't handle this case properly.
+ val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+ override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
+ // ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
+ // So we leave 400 bytes for the task.
+ if (shouldAllocate &&
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
+ val acquireExecutionMemoryMethod =
+ memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
+ acquireExecutionMemoryMethod.invoke(
+ memoryManager,
+ JLong.valueOf(
+ memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
+ JLong.valueOf(1L), // taskAttemptId
+ MemoryMode.ON_HEAP
+ ).asInstanceOf[java.lang.Long]
+ }
+ super.acquireExecutionMemory(required, consumer)
+ }
+ }
+ val taskContext = mock[TaskContext]
--- End diff --
> We can also create a TaskContextImpl by hand right?
I can. Just to save several lines :)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/22062#discussion_r209262151
--- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -94,12 +94,20 @@ public int numRecords() {
}
public void reset() {
+ // Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
+ pos = 0;
--- End diff --
For my understanding: this is enough to fix the actual issue here right?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org