You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/09/29 11:11:49 UTC
[spark] branch branch-3.0 updated: [SPARK-32901][CORE] Do not
allocate memory while spilling UnsafeExternalSorter
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d3cc564 [SPARK-32901][CORE] Do not allocate memory while spilling UnsafeExternalSorter
d3cc564 is described below
commit d3cc564de2e27d6f40d360a35ac86d17b39f1498
Author: Tom van Bussel <to...@databricks.com>
AuthorDate: Tue Sep 29 13:05:33 2020 +0200
[SPARK-32901][CORE] Do not allocate memory while spilling UnsafeExternalSorter
### What changes were proposed in this pull request?
This PR changes `UnsafeExternalSorter` to no longer allocate any memory while spilling. In particular it removes the allocation of a new pointer array in `UnsafeInMemorySorter`. Instead the new pointer array is allocated whenever the next record is inserted into the sorter.
### Why are the changes needed?
Without this change the `UnsafeExternalSorter` could throw an OOM while spilling. The following sequence of events would have triggered an OOM:
1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
4. `UnsafeInMemorySorter` frees the old pointer array, and tries to allocate a new small pointer array.
5. `TaskMemoryManager` tries to allocate the memory backing the small array using `MemoryManager`, but `MemoryManager` is unwilling to give it any memory, as the `TaskMemoryManager` is still holding on to the memory it got for the new large array.
6. `TaskMemoryManager` again asks `UnsafeExternalSorter` to spill, but this time there is nothing to spill.
7. `UnsafeInMemorySorter` receives less memory than it requested, and causes a `SparkOutOfMemoryError` to be thrown, which causes the current task to fail.
With the changes in the PR the following will happen instead:
1. `UnsafeExternalSorter` runs out of space in its pointer array and attempts to allocate a new large array to replace the old one.
2. `TaskMemoryManager` tries to allocate the memory backing the new large array using `MemoryManager`, but `MemoryManager` is only willing to return most but not all of the memory requested.
3. `TaskMemoryManager` asks `UnsafeExternalSorter` to spill, which causes `UnsafeExternalSorter` to spill the current run to disk, to free its record pages and to reset its `UnsafeInMemorySorter`.
4. `UnsafeInMemorySorter` frees the old pointer array.
5. `TaskMemoryManager` returns control to `UnsafeExternalSorter.growPointerArrayIfNecessary` (either by returning the the new large array or by throwing a `SparkOutOfMemoryError`).
6. `UnsafeExternalSorter` either frees the new large array or it ignores the `SparkOutOfMemoryError` depending on what happened in the previous step.
7. `UnsafeExternalSorter` successfully allocates a new small pointer array and operation continues as normal.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tests were added in `UnsafeExternalSorterSuite` and `UnsafeInMemorySorterSuite`.
Closes #29785 from tomvanbussel/SPARK-32901.
Authored-by: Tom van Bussel <to...@databricks.com>
Signed-off-by: herman <he...@databricks.com>
(cherry picked from commit f167002522d50eefb261c8ba2d66a23b781a38c4)
Signed-off-by: herman <he...@databricks.com>
---
.../unsafe/sort/UnsafeExternalSorter.java | 96 ++++++++++++++++------
.../unsafe/sort/UnsafeInMemorySorter.java | 55 +++++++------
.../unsafe/sort/UnsafeExternalSorterSuite.java | 46 ++++-------
.../unsafe/sort/UnsafeInMemorySorterSuite.java | 40 ++++-----
.../apache/spark/memory/TestMemoryManager.scala | 8 ++
5 files changed, 143 insertions(+), 102 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 71b9a5b..2096453 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -203,6 +203,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
+ // There could still be some memory allocated when there are no records in the in-memory
+ // sorter. We will not spill it however, to ensure that we can always process at least one
+ // record before spilling. See the comments in `allocateMemoryForRecordIfNecessary` for why
+ // this is necessary.
return 0L;
}
@@ -224,7 +228,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
- inMemSorter.reset();
+ inMemSorter.freeMemory();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
// pages, we might not be able to get memory for the pointer array.
@@ -325,7 +329,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
deleteSpillFiles();
freeMemory();
if (inMemSorter != null) {
- inMemSorter.free();
+ inMemSorter.freeMemory();
inMemSorter = null;
}
}
@@ -339,40 +343,53 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
+ if (inMemSorter.numRecords() <= 0) {
+ // Spilling was triggered just before this method was called. The pointer array was freed
+ // during the spill, so a new pointer array needs to be allocated here.
+ LongArray array = allocateArray(inMemSorter.getInitialSize());
+ inMemSorter.expandPointerArray(array);
+ return;
+ }
+
long used = inMemSorter.getMemoryUsage();
- LongArray array;
+ LongArray array = null;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
spill();
- return;
} catch (SparkOutOfMemoryError e) {
- // should have trigger spilling
- if (!inMemSorter.hasSpaceForAnotherRecord()) {
+ if (inMemSorter.numRecords() > 0) {
logger.error("Unable to grow the pointer array");
throw e;
}
- return;
+ // The new array could not be allocated, but that is not an issue as it is longer needed,
+ // as all records were spilled.
}
- // check if spilling is triggered or not
- if (inMemSorter.hasSpaceForAnotherRecord()) {
- freeArray(array);
- } else {
- inMemSorter.expandPointerArray(array);
+
+ if (inMemSorter.numRecords() <= 0) {
+ // Spilling was triggered while trying to allocate the new array.
+ if (array != null) {
+ // We succeeded in allocating the new array, but, since all records were spilled, a
+ // smaller array would also suffice.
+ freeArray(array);
+ }
+ // The pointer array was freed during the spill, so a new pointer array needs to be
+ // allocated here.
+ array = allocateArray(inMemSorter.getInitialSize());
}
+ inMemSorter.expandPointerArray(array);
}
}
/**
- * Allocates more memory in order to insert an additional record. This will request additional
- * memory from the memory manager and spill if the requested memory can not be obtained.
+ * Allocates an additional page in order to insert an additional record. This will request
+ * additional memory from the memory manager and spill if the requested memory can not be
+ * obtained.
*
* @param required the required space in the data page, in bytes, including space for storing
- * the record size. This must be less than or equal to the page size (records
- * that exceed the page size are handled via a different code path which uses
- * special overflow pages).
+ * the record size.
*/
private void acquireNewPageIfNecessary(int required) {
if (currentPage == null ||
@@ -385,6 +402,37 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
+ * Allocates more memory in order to insert an additional record. This will request additional
+ * memory from the memory manager and spill if the requested memory can not be obtained.
+ *
+ * @param required the required space in the data page, in bytes, including space for storing
+ * the record size.
+ */
+ private void allocateMemoryForRecordIfNecessary(int required) throws IOException {
+ // Step 1:
+ // Ensure that the pointer array has space for another record. This may cause a spill.
+ growPointerArrayIfNecessary();
+ // Step 2:
+ // Ensure that the last page has space for another record. This may cause a spill.
+ acquireNewPageIfNecessary(required);
+ // Step 3:
+ // The allocation in step 2 could have caused a spill, which would have freed the pointer
+ // array allocated in step 1. Therefore we need to check again whether we have to allocate
+ // a new pointer array.
+ //
+ // If the allocation in this step causes a spill event then it will not cause the page
+ // allocated in the previous step to be freed. The function `spill` only frees memory if at
+ // least one record has been inserted in the in-memory sorter. This will not be the case if
+ // we have spilled in the previous step.
+ //
+ // If we did not spill in the previous step then `growPointerArrayIfNecessary` will be a
+ // no-op that does not allocate any memory, and therefore can't cause a spill event.
+ //
+ // Thus there is no need to call `acquireNewPageIfNecessary` again after this step.
+ growPointerArrayIfNecessary();
+ }
+
+ /**
* Write a record to the sorter.
*/
public void insertRecord(
@@ -398,11 +446,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
spill();
}
- growPointerArrayIfNecessary();
- int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
final int required = length + uaoSize;
- acquireNewPageIfNecessary(required);
+ allocateMemoryForRecordIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -425,10 +472,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
Object valueBase, long valueOffset, int valueLen, long prefix, boolean prefixIsNull)
throws IOException {
- growPointerArrayIfNecessary();
- int uaoSize = UnsafeAlignedOffset.getUaoSize();
+ final int uaoSize = UnsafeAlignedOffset.getUaoSize();
final int required = keyLen + valueLen + (2 * uaoSize);
- acquireNewPageIfNecessary(required);
+ allocateMemoryForRecordIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
@@ -561,7 +607,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
totalSortTimeNanos += inMemSorter.getSortTimeNanos();
- inMemSorter.free();
+ inMemSorter.freeMemory();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
@@ -656,7 +702,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
i += spillWriter.recordsSpilled();
}
- if (inMemSorter != null) {
+ if (inMemSorter != null && inMemSorter.numRecords() > 0) {
UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
moveOver(iter, startIndex - i);
queue.add(iter);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index ff641a2..33be899 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -159,32 +159,26 @@ public final class UnsafeInMemorySorter {
return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5));
}
+ public long getInitialSize() {
+ return initialSize;
+ }
+
/**
* Free the memory used by pointer array.
*/
- public void free() {
+ public void freeMemory() {
if (consumer != null) {
if (array != null) {
consumer.freeArray(array);
}
- array = null;
- }
- }
- public void reset() {
- if (consumer != null) {
- consumer.freeArray(array);
- // the call to consumer.allocateArray may trigger a spill which in turn access this instance
- // and eventually re-enter this method and try to free the array again. by setting the array
- // to null and its length to 0 we effectively make the spill code-path a no-op. setting the
- // array to null also indicates that it has already been de-allocated which prevents a double
- // de-allocation in free().
+ // Set the array to null instead of allocating a new array. Allocating an array could have
+ // triggered another spill and this method already is called from UnsafeExternalSorter when
+ // spilling. Attempting to allocate while spilling is dangerous, as we could be holding onto
+ // a large partially complete allocation, which may prevent other memory from being allocated.
+ // Instead we will allocate the new array when it is necessary.
array = null;
usableCapacity = 0;
- pos = 0;
- nullBoundaryPos = 0;
- array = consumer.allocateArray(initialSize);
- usableCapacity = getUsableCapacity();
}
pos = 0;
nullBoundaryPos = 0;
@@ -217,18 +211,20 @@ public final class UnsafeInMemorySorter {
}
public void expandPointerArray(LongArray newArray) {
- if (newArray.size() < array.size()) {
- // checkstyle.off: RegexpSinglelineJava
- throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
- // checkstyle.on: RegexpSinglelineJava
+ if (array != null) {
+ if (newArray.size() < array.size()) {
+ // checkstyle.off: RegexpSinglelineJava
+ throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
+ // checkstyle.on: RegexpSinglelineJava
+ }
+ Platform.copyMemory(
+ array.getBaseObject(),
+ array.getBaseOffset(),
+ newArray.getBaseObject(),
+ newArray.getBaseOffset(),
+ pos * 8L);
+ consumer.freeArray(array);
}
- Platform.copyMemory(
- array.getBaseObject(),
- array.getBaseOffset(),
- newArray.getBaseObject(),
- newArray.getBaseOffset(),
- pos * 8L);
- consumer.freeArray(array);
array = newArray;
usableCapacity = getUsableCapacity();
}
@@ -347,6 +343,11 @@ public final class UnsafeInMemorySorter {
* {@code next()} will return the same mutable object.
*/
public UnsafeSorterIterator getSortedIterator() {
+ if (numRecords() == 0) {
+ // `array` might be null, so make sure that it is not accessed by returning early.
+ return new SortedIterator(0, 0);
+ }
+
int offset = 0;
long start = System.nanoTime();
if (sortComparator != null) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 087d090..9904a0a 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.UUID;
-import org.hamcrest.Matchers;
import scala.Tuple2$;
import org.junit.After;
@@ -38,7 +37,6 @@ import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.TestMemoryManager;
-import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.serializer.SerializerInstance;
@@ -551,40 +549,28 @@ public class UnsafeExternalSorterSuite {
}
@Test
- public void testOOMDuringSpill() throws Exception {
+ public void testNoOOMDuringSpill() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
- // we assume that given default configuration,
- // the size of the data we insert to the sorter (ints)
- // and assuming we shouldn't spill before pointers array is exhausted
- // (memory manager is not configured to throw at this point)
- // - so this loop runs a reasonable number of iterations (<2000).
- // test indeed completed within <30ms (on a quad i7 laptop).
- for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+ for (int i = 0; i < 100; i++) {
insertNumber(sorter, i);
}
- // we expect the next insert to attempt growing the pointerssArray first
- // allocation is expected to fail, then a spill is triggered which
- // attempts another allocation which also fails and we expect to see this
- // OOM here. the original code messed with a released array within the
- // spill code and ended up with a failed assertion. we also expect the
- // location of the OOM to be
- // org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
- memoryManager.markconsequentOOM(2);
- try {
- insertNumber(sorter, 1024);
- fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
- }
- // we expect an SparkOutOfMemoryError here, anything else (i.e the original NPE is a failure)
- catch (SparkOutOfMemoryError oom){
- String oomStackTrace = Utils.exceptionString(oom);
- assertThat("expected SparkOutOfMemoryError in " +
- "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
- oomStackTrace,
- Matchers.containsString(
- "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
+
+ // Check that spilling still succeeds when the task is starved for memory.
+ memoryManager.markconsequentOOM(Integer.MAX_VALUE);
+ sorter.spill();
+ memoryManager.resetConsequentOOM();
+
+ // Ensure that records can be appended after spilling, i.e. check that the sorter will allocate
+ // the new pointer array that it could not allocate while spilling.
+ for (int i = 0; i < 100; ++i) {
+ insertNumber(sorter, i);
}
+
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
}
+
private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
throws IOException {
for (int i = start; i < end; i++) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 2b8a060..9d4909d 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import org.apache.spark.unsafe.array.LongArray;
import org.junit.Assert;
import org.junit.Test;
@@ -27,7 +28,6 @@ import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TestMemoryManager;
-import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -37,7 +37,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.isIn;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
public class UnsafeInMemorySorterSuite {
@@ -147,7 +146,7 @@ public class UnsafeInMemorySorterSuite {
}
@Test
- public void freeAfterOOM() {
+ public void testNoOOMDuringReset() {
final SparkConf sparkConf = new SparkConf();
sparkConf.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false);
@@ -156,12 +155,7 @@ public class UnsafeInMemorySorterSuite {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
testMemoryManager, 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
- final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
- final Object baseObject = dataPage.getBaseObject();
- // Write the records into the data page:
- long position = dataPage.getBaseOffset();
- final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = PrefixComparators.LONG;
final RecordComparator recordComparator = new RecordComparator() {
@@ -179,18 +173,24 @@ public class UnsafeInMemorySorterSuite {
UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
recordComparator, prefixComparator, 100, shouldUseRadixSort());
- testMemoryManager.markExecutionAsOutOfMemoryOnce();
- try {
- sorter.reset();
- fail("expected SparkOutOfMemoryError but it seems operation surprisingly succeeded");
- } catch (SparkOutOfMemoryError oom) {
- // as expected
- }
- // [SPARK-21907] this failed on NPE at
- // org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
- sorter.free();
- // simulate a 'back to back' free.
- sorter.free();
+ // Ensure that the sorter does not OOM while freeing its memory.
+ testMemoryManager.markconsequentOOM(Integer.MAX_VALUE);
+ sorter.freeMemory();
+ testMemoryManager.resetConsequentOOM();
+ Assert.assertFalse(sorter.hasSpaceForAnotherRecord());
+
+ // Get the sorter in an usable state again by allocating a new pointer array.
+ LongArray array = consumer.allocateArray(1000);
+ sorter.expandPointerArray(array);
+
+ // Ensure that it is safe to call freeMemory() multiple times.
+ testMemoryManager.markconsequentOOM(Integer.MAX_VALUE);
+ sorter.freeMemory();
+ sorter.freeMemory();
+ testMemoryManager.resetConsequentOOM();
+ Assert.assertFalse(sorter.hasSpaceForAnotherRecord());
+
+ assertEquals(0L, memoryManager.cleanUpAllAllocatedMemory());
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 60f6769..987f383 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -119,6 +119,14 @@ class TestMemoryManager(conf: SparkConf)
consequentOOM += n
}
+ /**
+ * Undos the effects of [[markExecutionAsOutOfMemoryOnce]] and [[markconsequentOOM]] and lets
+ * calls to [[acquireExecutionMemory()]] (if there is enough memory available).
+ */
+ def resetConsequentOOM(): Unit = synchronized {
+ consequentOOM = 0
+ }
+
def limit(avail: Long): Unit = synchronized {
require(avail >= 0)
available = avail
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org