You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2016/11/07 20:47:57 UTC
spark git commit: [SPARK-14914][CORE] Fix Resource not closed after
using, mostly for unit tests
Repository: spark
Updated Branches:
refs/heads/master 0d95662e7 -> 8f0ea011a
[SPARK-14914][CORE] Fix Resource not closed after using, mostly for unit tests
## What changes were proposed in this pull request?
Close `FileStreams`, `ZipFiles` etc to release the resources after using. Not closing the resources will cause IO Exception to be raised while deleting temp files.
## How was this patch tested?
Existing tests
Author: U-FAREAST\tl <tl...@microsoft.com>
Author: hyukjinkwon <gu...@gmail.com>
Author: Tao LI <tl...@microsoft.com>
Closes #15618 from HyukjinKwon/SPARK-14914-1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0ea011
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0ea011
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0ea011
Branch: refs/heads/master
Commit: 8f0ea011a7294679ec4275b2fef349ef45b6eb81
Parents: 0d95662
Author: Hyukjin Kwon <gu...@gmail.com>
Authored: Mon Nov 7 12:47:39 2016 -0800
Committer: Mridul Muralidharan <mr...@gmail.com>
Committed: Mon Nov 7 12:47:39 2016 -0800
----------------------------------------------------------------------
.../spark/rdd/ReliableCheckpointRDD.scala | 13 +++++---
.../test/scala/org/apache/spark/FileSuite.scala | 13 +++++---
.../spark/deploy/RPackageUtilsSuite.scala | 35 ++++++++++++--------
.../deploy/history/FsHistoryProviderSuite.scala | 8 ++++-
.../scheduler/EventLoggingListenerSuite.scala | 26 +++++++++------
.../spark/scheduler/TaskResultGetterSuite.scala | 7 ++--
.../apache/spark/mllib/util/MLUtilsSuite.scala | 16 +++++----
.../apache/spark/streaming/JavaAPISuite.java | 1 +
.../spark/streaming/CheckpointSuite.scala | 16 +++++----
.../spark/streaming/MasterFailureTest.scala | 1 +
.../streaming/ReceivedBlockTrackerSuite.scala | 5 +++
.../streaming/util/WriteAheadLogSuite.scala | 1 +
12 files changed, 93 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index eac901d..9f800e3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -239,12 +239,17 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
- val deserializeStream = serializer.deserializeStream(fileInputStream)
- val partitioner = Utils.tryWithSafeFinally[Partitioner] {
- deserializeStream.readObject[Partitioner]
+ val partitioner = Utils.tryWithSafeFinally {
+ val deserializeStream = serializer.deserializeStream(fileInputStream)
+ Utils.tryWithSafeFinally {
+ deserializeStream.readObject[Partitioner]
+ } {
+ deserializeStream.close()
+ }
} {
- deserializeStream.close()
+ fileInputStream.close()
}
+
logDebug(s"Read partitioner from $partitionerFilePath")
Some(partitioner)
} catch {
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index cc52bb1..89f0b1c 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -58,10 +58,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
nums.saveAsTextFile(outputDir)
// Read the plain text file and check it's OK
val outputFile = new File(outputDir, "part-00000")
- val content = Source.fromFile(outputFile).mkString
- assert(content === "1\n2\n3\n4\n")
- // Also try reading it in as a text file RDD
- assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+ val bufferSrc = Source.fromFile(outputFile)
+ Utils.tryWithSafeFinally {
+ val content = bufferSrc.mkString
+ assert(content === "1\n2\n3\n4\n")
+ // Also try reading it in as a text file RDD
+ assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+ } {
+ bufferSrc.close()
+ }
}
test("text files (compressed)") {
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 13cba94..0055870 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, Utils}
class RPackageUtilsSuite
extends SparkFunSuite
@@ -74,9 +74,13 @@ class RPackageUtilsSuite
val deps = Seq(dep1, dep2).mkString(",")
IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
- assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
- assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
- assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+ Utils.tryWithSafeFinally {
+ assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+ } {
+ jars.foreach(_.close())
+ }
}
}
@@ -131,7 +135,7 @@ class RPackageUtilsSuite
test("SparkR zipping works properly") {
val tempDir = Files.createTempDir()
- try {
+ Utils.tryWithSafeFinally {
IvyTestUtils.writeFile(tempDir, "test.R", "abc")
val fakeSparkRDir = new File(tempDir, "SparkR")
assert(fakeSparkRDir.mkdirs())
@@ -144,14 +148,19 @@ class RPackageUtilsSuite
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
assert(finalZip.exists())
- val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
- assert(entries.contains("/test.R"))
- assert(entries.contains("/SparkR/abc.R"))
- assert(entries.contains("/SparkR/DESCRIPTION"))
- assert(!entries.contains("/package.zip"))
- assert(entries.contains("/packageTest/def.R"))
- assert(entries.contains("/packageTest/DESCRIPTION"))
- } finally {
+ val zipFile = new ZipFile(finalZip)
+ Utils.tryWithSafeFinally {
+ val entries = zipFile.entries().asScala.map(_.getName).toSeq
+ assert(entries.contains("/test.R"))
+ assert(entries.contains("/SparkR/abc.R"))
+ assert(entries.contains("/SparkR/DESCRIPTION"))
+ assert(!entries.contains("/package.zip"))
+ assert(entries.contains("/packageTest/def.R"))
+ assert(entries.contains("/packageTest/DESCRIPTION"))
+ } {
+ zipFile.close()
+ }
+ } {
FileUtils.deleteDirectory(tempDir)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index a5eda7b..2c41c43 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -449,8 +449,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val bstream = new BufferedOutputStream(cstream)
if (isNewFormat) {
- EventLoggingListener.initEventLog(new FileOutputStream(file))
+ val newFormatStream = new FileOutputStream(file)
+ Utils.tryWithSafeFinally {
+ EventLoggingListener.initEventLog(newFormatStream)
+ } {
+ newFormatStream.close()
+ }
}
+
val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
Utils.tryWithSafeFinally {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 7f48592..8a5ec37 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -202,8 +202,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// Make sure expected events exist in the log file.
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
- val logStart = SparkListenerLogStart(SPARK_VERSION)
- val lines = readLines(logData)
val eventSet = mutable.Set(
SparkListenerApplicationStart,
SparkListenerBlockManagerAdded,
@@ -216,19 +214,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
SparkListenerTaskStart,
SparkListenerTaskEnd,
SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
- lines.foreach { line =>
- eventSet.foreach { event =>
- if (line.contains(event)) {
- val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
- val eventType = Utils.getFormattedClassName(parsedEvent)
- if (eventType == event) {
- eventSet.remove(event)
+ Utils.tryWithSafeFinally {
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
+ val lines = readLines(logData)
+ lines.foreach { line =>
+ eventSet.foreach { event =>
+ if (line.contains(event)) {
+ val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+ val eventType = Utils.getFormattedClassName(parsedEvent)
+ if (eventType == event) {
+ eventSet.remove(event)
+ }
}
}
}
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+ assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
+ } {
+ logData.close()
}
- assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
- assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}
private def readLines(in: InputStream): Seq[String] = {
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 9e472f9..ee95e4f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
// ensure we reset the classloader after the test completes
val originalClassLoader = Thread.currentThread.getContextClassLoader
- try {
+ val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
+ Utils.tryWithSafeFinally {
// load the exception from the jar
- val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
loader.addURL(jarFile.toURI.toURL)
Thread.currentThread().setContextClassLoader(loader)
val excClass: Class[_] = Utils.classForName("repro.MyException")
@@ -209,8 +209,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
- } finally {
+ } {
Thread.currentThread.setContextClassLoader(originalClassLoader)
+ loader.close()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index e4e9be3..665708a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -155,13 +155,17 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
val tempDir = Utils.createTempDir()
val outputDir = new File(tempDir, "output")
MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
- val lines = outputDir.listFiles()
+ val sources = outputDir.listFiles()
.filter(_.getName.startsWith("part-"))
- .flatMap(Source.fromFile(_).getLines())
- .toSet
- val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
- assert(lines === expected)
- Utils.deleteRecursively(tempDir)
+ .map(Source.fromFile)
+ Utils.tryWithSafeFinally {
+ val lines = sources.flatMap(_.getLines()).toSet
+ val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
+ assert(lines === expected)
+ } {
+ sources.foreach(_.close())
+ Utils.deleteRecursively(tempDir)
+ }
}
test("appendBias") {
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 3d54abd..648a5ab 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1805,6 +1805,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+ ssc.stop();
Utils.deleteRecursively(tempDir);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index b79cc65..41f16bf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -642,16 +642,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val fileStream = ssc.textFileStream(testDir.toString)
// Make value 3 take a large time to process, to ensure that the driver
// shuts down in the middle of processing the 3rd batch
- CheckpointSuite.batchThreeShouldBlockIndefinitely = true
- val mappedStream = fileStream.map(s => {
+ CheckpointSuite.batchThreeShouldBlockALongTime = true
+ val mappedStream = fileStream.map { s =>
val i = s.toInt
if (i == 3) {
- while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
- Thread.sleep(Long.MaxValue)
+ if (CheckpointSuite.batchThreeShouldBlockALongTime) {
+ // It's not a good idea to let the thread run forever
+ // as resource won't be correctly released
+ Thread.sleep(6000)
}
}
i
- })
+ }
// Reducing over a large window to ensure that recovery from driver failure
// requires reprocessing of all the files seen before the failure
@@ -691,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}
// The original StreamingContext has now been stopped.
- CheckpointSuite.batchThreeShouldBlockIndefinitely = false
+ CheckpointSuite.batchThreeShouldBlockALongTime = false
// Create files while the streaming driver is down
for (i <- Seq(4, 5, 6)) {
@@ -928,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}
private object CheckpointSuite extends Serializable {
- var batchThreeShouldBlockIndefinitely: Boolean = true
+ var batchThreeShouldBlockALongTime: Boolean = true
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 60c8e70..fff2d6f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -164,6 +164,7 @@ object MasterFailureTest extends Logging {
val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
fileGeneratingThread.join()
+ ssc.stop()
fs.delete(checkpointDir, true)
fs.delete(testDir, true)
logInfo("Finished test after " + killCount + " failures")
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 851013b..107c3f5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -134,6 +134,7 @@ class ReceivedBlockTrackerSuite
val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1
+ tracker1.stop()
incrementTime()
@@ -141,6 +142,7 @@ class ReceivedBlockTrackerSuite
val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
tracker1_.hasUnallocatedReceivedBlocks should be (false)
+ tracker1_.stop()
// Restart tracker and verify recovered list of unallocated blocks
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
@@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite
val blockInfos2 = addBlockInfos(tracker2)
tracker2.allocateBlocksToBatch(batchTime2)
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+ tracker2.stop()
// Verify whether log has correct contents
val expectedWrittenData2 = expectedWrittenData1 ++
@@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite
getWriteAheadLogFiles() should not contain oldestLogFile
}
printLogFiles("After clean")
+ tracker3.stop()
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
@@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+ tracker4.stop()
}
test("disable write ahead log when checkpoint directory is not set") {
http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 24cb5af..4bec52b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests(
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
}
}
+ writeAheadLog.close()
}
test(testPrefix + "handling file errors while reading rotating logs") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org