You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/03 14:18:39 UTC
spark git commit: [SPARK-20236][SQL] dynamic partition overwrite
Repository: spark
Updated Branches:
refs/heads/master 1a87a1609 -> a66fe36ce
[SPARK-20236][SQL] dynamic partition overwrite
## What changes were proposed in this pull request?
When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.
data source table: delete all partition directories that match the static partition values provided in the insert statement.
hive table: only delete partition directories which have data written into it
This PR adds a new config to make users be able to choose hive's behavior.
## How was this patch tested?
new tests
Author: Wenchen Fan <we...@databricks.com>
Closes #18714 from cloud-fan/overwrite-partition.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a66fe36c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a66fe36c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a66fe36c
Branch: refs/heads/master
Commit: a66fe36cee9363b01ee70e469f1c968f633c5713
Parents: 1a87a16
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Jan 3 22:18:13 2018 +0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jan 3 22:18:13 2018 +0800
----------------------------------------------------------------------
.../spark/internal/io/FileCommitProtocol.scala | 25 +++++--
.../io/HadoopMapReduceCommitProtocol.scala | 75 +++++++++++++++----
.../org/apache/spark/sql/internal/SQLConf.scala | 21 ++++++
.../InsertIntoHadoopFsRelationCommand.scala | 20 ++++-
.../SQLHadoopMapReduceCommitProtocol.scala | 10 ++-
.../apache/spark/sql/sources/InsertSuite.scala | 78 ++++++++++++++++++++
6 files changed, 200 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index 50f51e1..6d0059b 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -28,8 +28,9 @@ import org.apache.spark.util.Utils
*
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
* will be used for tasks on executors.
- * 2. Implementations should have a constructor with 2 arguments:
- * (jobId: String, path: String)
+ * 2. Implementations should have a constructor with 2 or 3 arguments:
+ * (jobId: String, path: String) or
+ * (jobId: String, path: String, dynamicPartitionOverwrite: Boolean)
* 3. A committer should not be reused across multiple Spark jobs.
*
* The proper call sequence is:
@@ -139,10 +140,22 @@ object FileCommitProtocol {
/**
* Instantiates a FileCommitProtocol using the given className.
*/
- def instantiate(className: String, jobId: String, outputPath: String)
- : FileCommitProtocol = {
+ def instantiate(
+ className: String,
+ jobId: String,
+ outputPath: String,
+ dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
- val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
- ctor.newInstance(jobId, outputPath)
+ // First try the constructor with arguments (jobId: String, outputPath: String,
+ // dynamicPartitionOverwrite: Boolean).
+ // If that doesn't exist, try the one with (jobId: string, outputPath: String).
+ try {
+ val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
+ ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
+ } catch {
+ case _: NoSuchMethodException =>
+ val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
+ ctor.newInstance(jobId, outputPath)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 95c99d2..6d20ef1 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -39,8 +39,19 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
*
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
+ * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
+ * dynamically, i.e., we first write files under a staging
+ * directory with partition path, e.g.
+ * /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
+ * we first clean up the corresponding partition directories at
+ * destination path, e.g. /path/to/destination/a=1/b=1, and move
+ * files from staging directory to the corresponding partition
+ * directories under destination path.
*/
-class HadoopMapReduceCommitProtocol(jobId: String, path: String)
+class HadoopMapReduceCommitProtocol(
+ jobId: String,
+ path: String,
+ dynamicPartitionOverwrite: Boolean = false)
extends FileCommitProtocol with Serializable with Logging {
import FileCommitProtocol._
@@ -67,9 +78,17 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null
/**
- * The staging directory for all files committed with absolute output paths.
+ * Tracks partitions with default path that have new files written into them by this task,
+ * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
+ * destination directory at the end, if `dynamicPartitionOverwrite` is true.
*/
- private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
+ @transient private var partitionPaths: mutable.Set[String] = null
+
+ /**
+ * The staging directory of this write job. Spark uses it to deal with files with absolute output
+ * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
+ */
+ private def stagingDir = new Path(path, ".spark-staging-" + jobId)
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.newInstance()
@@ -85,11 +104,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFilename(taskContext, ext)
- val stagingDir: String = committer match {
+ val stagingDir: Path = committer match {
+ case _ if dynamicPartitionOverwrite =>
+ assert(dir.isDefined,
+ "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
+ partitionPaths += dir.get
+ this.stagingDir
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
- Option(f.getWorkPath).map(_.toString).getOrElse(path)
- case _ => path
+ new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
+ case _ => new Path(path)
}
dir.map { d =>
@@ -106,8 +130,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
// Include a UUID here to prevent file collisions for one task writing to different dirs.
// In principle we could include hash(absoluteDir) instead but this is simpler.
- val tmpOutputPath = new Path(
- absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString
+ val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString
addedAbsPathFiles(tmpOutputPath) = absOutputPath
tmpOutputPath
@@ -141,23 +164,42 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
- val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
- .foldLeft(Map[String, String]())(_ ++ _)
- logDebug(s"Committing files staged for absolute locations $filesToMove")
+
if (hasValidPath) {
- val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+ val (allAbsPathFiles, allPartitionPaths) =
+ taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
+ val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+
+ val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
+ logDebug(s"Committing files staged for absolute locations $filesToMove")
+ if (dynamicPartitionOverwrite) {
+ val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
+ logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
+ absPartitionPaths.foreach(fs.delete(_, true))
+ }
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
}
- fs.delete(absPathStagingDir, true)
+
+ if (dynamicPartitionOverwrite) {
+ val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
+ logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
+ for (part <- partitionPaths) {
+ val finalPartPath = new Path(path, part)
+ fs.delete(finalPartPath, true)
+ fs.rename(new Path(stagingDir, part), finalPartPath)
+ }
+ }
+
+ fs.delete(stagingDir, true)
}
}
override def abortJob(jobContext: JobContext): Unit = {
committer.abortJob(jobContext, JobStatus.State.FAILED)
if (hasValidPath) {
- val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
- fs.delete(absPathStagingDir, true)
+ val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+ fs.delete(stagingDir, true)
}
}
@@ -165,13 +207,14 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
committer = setupCommitter(taskContext)
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
+ partitionPaths = mutable.Set[String]()
}
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
- new TaskCommitMessage(addedAbsPathFiles.toMap)
+ new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}
override def abortTask(taskContext: TaskAttemptContext): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 80cdc61..5d6edf6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1068,6 +1068,24 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)
+ object PartitionOverwriteMode extends Enumeration {
+ val STATIC, DYNAMIC = Value
+ }
+
+ val PARTITION_OVERWRITE_MODE =
+ buildConf("spark.sql.sources.partitionOverwriteMode")
+ .doc("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: " +
+ "static and dynamic. In static mode, Spark deletes all the partitions that match the " +
+ "partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before " +
+ "overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
+ "those partitions that have data written into it at runtime. By default we use static " +
+ "mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
+ "affect Hive serde tables, as they are always overwritten with dynamic mode.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(PartitionOverwriteMode.values.map(_.toString))
+ .createWithDefault(PartitionOverwriteMode.STATIC.toString)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -1394,6 +1412,9 @@ class SQLConf extends Serializable with Logging {
def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
+ def partitionOverwriteMode: PartitionOverwriteMode.Value =
+ PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index ad24e28..dd7ef0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.util.SchemaUtils
/**
@@ -89,13 +90,19 @@ case class InsertIntoHadoopFsRelationCommand(
}
val pathExists = fs.exists(qualifiedOutputPath)
- // If we are appending data to an existing dir.
- val isAppend = pathExists && (mode == SaveMode.Append)
+
+ val enableDynamicOverwrite =
+ sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+ // This config only makes sense when we are overwriting a partitioned dataset with dynamic
+ // partition columns.
+ val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
+ staticPartitions.size < partitionColumns.length
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
- outputPath = outputPath.toString)
+ outputPath = outputPath.toString,
+ dynamicPartitionOverwrite = dynamicPartitionOverwrite)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
@@ -103,6 +110,9 @@ case class InsertIntoHadoopFsRelationCommand(
case (SaveMode.Overwrite, true) =>
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
+ } else if (dynamicPartitionOverwrite) {
+ // For dynamic partition overwrite, do not delete partition directories ahead.
+ true
} else {
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
true
@@ -126,7 +136,9 @@ case class InsertIntoHadoopFsRelationCommand(
catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
ifNotExists = true).run(sparkSession)
}
- if (mode == SaveMode.Overwrite) {
+ // For dynamic partition overwrite, we never remove partitions but only update existing
+ // ones.
+ if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
index 40825a1..39c594a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
@@ -29,11 +29,15 @@ import org.apache.spark.sql.internal.SQLConf
* A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
* Hadoop output committer using an option specified in SQLConf.
*/
-class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String)
- extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
+class SQLHadoopMapReduceCommitProtocol(
+ jobId: String,
+ path: String,
+ dynamicPartitionOverwrite: Boolean = false)
+ extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
+ with Serializable with Logging {
override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
- var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+ var committer = super.setupCommitter(context)
val configuration = context.getConfiguration
val clazz =
http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 8b7e2e5..fef01c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -21,6 +21,8 @@ import java.io.File
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
@@ -442,4 +444,80 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
assert(e.contains("Only Data Sources providing FileFormat are supported"))
}
}
+
+ test("SPARK-20236: dynamic partition overwrite without catalog table") {
+ withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
+ withTempPath { path =>
+ Seq((1, 1, 1)).toDF("i", "part1", "part2")
+ .write.partitionBy("part1", "part2").parquet(path.getAbsolutePath)
+ checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1, 1))
+
+ Seq((2, 1, 1)).toDF("i", "part1", "part2")
+ .write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
+ checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1))
+
+ Seq((2, 2, 2)).toDF("i", "part1", "part2")
+ .write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
+ checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
+ }
+ }
+ }
+
+ test("SPARK-20236: dynamic partition overwrite") {
+ withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
+ withTable("t") {
+ sql(
+ """
+ |create table t(i int, part1 int, part2 int) using parquet
+ |partitioned by (part1, part2)
+ """.stripMargin)
+
+ sql("insert into t partition(part1=1, part2=1) select 1")
+ checkAnswer(spark.table("t"), Row(1, 1, 1))
+
+ sql("insert overwrite table t partition(part1=1, part2=1) select 2")
+ checkAnswer(spark.table("t"), Row(2, 1, 1))
+
+ sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
+ checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
+
+ sql("insert overwrite table t partition(part1=1, part2=2) select 3")
+ checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+
+ sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
+ checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+ }
+ }
+ }
+
+ test("SPARK-20236: dynamic partition overwrite with customer partition path") {
+ withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
+ withTable("t") {
+ sql(
+ """
+ |create table t(i int, part1 int, part2 int) using parquet
+ |partitioned by (part1, part2)
+ """.stripMargin)
+
+ val path1 = Utils.createTempDir()
+ sql(s"alter table t add partition(part1=1, part2=1) location '$path1'")
+ sql(s"insert into t partition(part1=1, part2=1) select 1")
+ checkAnswer(spark.table("t"), Row(1, 1, 1))
+
+ sql("insert overwrite table t partition(part1=1, part2=1) select 2")
+ checkAnswer(spark.table("t"), Row(2, 1, 1))
+
+ sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
+ checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
+
+ val path2 = Utils.createTempDir()
+ sql(s"alter table t add partition(part1=1, part2=2) location '$path2'")
+ sql("insert overwrite table t partition(part1=1, part2=2) select 3")
+ checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+
+ sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
+ checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org