You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/22 19:04:33 UTC

spark git commit: [SPARK-8406] [SQL] Backports SPARK-8406 and PR #6864 to branch-1.4

Repository: spark
Updated Branches:
  refs/heads/branch-1.4 b836bac3f -> 451c8722a


[SPARK-8406] [SQL] Backports SPARK-8406 and PR #6864 to branch-1.4

Author: Cheng Lian <li...@databricks.com>

Closes #6932 from liancheng/spark-8406-for-1.4 and squashes the following commits:

a0168fe [Cheng Lian] Backports SPARK-8406 and PR #6864 to branch-1.4


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/451c8722
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/451c8722
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/451c8722

Branch: refs/heads/branch-1.4
Commit: 451c8722afea83e8e8f11c438469eea10e5acf4c
Parents: b836bac
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Jun 22 10:04:29 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jun 22 10:04:29 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 41 ++------------
 .../org/apache/spark/sql/sources/commands.scala | 59 ++++++++++++++++----
 .../spark/sql/hive/orc/OrcFileOperator.scala    |  7 ++-
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  3 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 .../spark/sql/hive/orc/OrcSourceSuite.scala     | 22 ++++----
 .../spark/sql/sources/SimpleTextRelation.scala  |  4 +-
 .../sql/sources/hadoopFsRelationSuites.scala    | 39 +++++++++++--
 8 files changed, 110 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 3328e6f..abf9614 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -61,50 +61,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
   extends OutputWriter {
 
   private val recordWriter: RecordWriter[Void, Row] = {
-    val conf = context.getConfiguration
     val outputFormat = {
-      // When appending new Parquet files to an existing Parquet file directory, to avoid
-      // overwriting existing data files, we need to find out the max task ID encoded in these data
-      // file names.
-      // TODO Make this snippet a utility function for other data source developers
-      val maxExistingTaskId = {
-        // Note that `path` may point to a temporary location.  Here we retrieve the real
-        // destination path from the configuration
-        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
-        val fs = outputPath.getFileSystem(conf)
-
-        if (fs.exists(outputPath)) {
-          // Pattern used to match task ID in part file names, e.g.:
-          //
-          //   part-r-00001.gz.parquet
-          //          ^~~~~
-          val partFilePattern = """part-.-(\d{1,}).*""".r
-
-          fs.listStatus(outputPath).map(_.getPath.getName).map {
-            case partFilePattern(id) => id.toInt
-            case name if name.startsWith("_") => 0
-            case name if name.startsWith(".") => 0
-            case name => throw new AnalysisException(
-              s"Trying to write Parquet files to directory $outputPath, " +
-                s"but found items with illegal name '$name'.")
-          }.reduceOption(_ max _).getOrElse(0)
-        } else {
-          0
-        }
-      }
-
       new ParquetOutputFormat[Row]() {
         // Here we override `getDefaultWorkFile` for two reasons:
         //
-        //  1. To allow appending.  We need to generate output file name based on the max available
-        //     task ID computed above.
+        //  1. To allow appending.  We need to generate unique output file names to avoid
+        //     overwriting existing files (either exist before the write job, or are just written
+        //     by other tasks within the same write job).
         //
         //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
         //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
         //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-          val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
-          new Path(path, f"part-r-$split%05d$extension")
+          val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+          val split = context.getTaskAttemptID.getTaskID.getId
+          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index eb4e8f8..9a75dd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.sources
 
-import java.util.Date
+import java.util.{UUID, Date}
 
 import scala.collection.mutable
 
@@ -59,6 +59,28 @@ private[sql] case class InsertIntoDataSource(
   }
 }
 
+/**
+ * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job.  Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file.  This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ *   1. Driver side setup, including output committer initialization and data source specific
+ *      preparation work for the write job to be issued.
+ *   2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ *      rows within an RDD partition.
+ *   3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
+ *      exception is thrown during task commitment, also aborts that task.
+ *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
+ *      thrown during job commitment, also aborts the job.
+ */
 private[sql] case class InsertIntoHadoopFsRelation(
     @transient relation: HadoopFsRelation,
     @transient query: LogicalPlan,
@@ -271,6 +293,13 @@ private[sql] abstract class BaseWriterContainer(
 
   protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))
 
+  // This UUID is used to avoid output file name collision between different appending write jobs.
+  // These jobs may belong to different SparkContext instances. Concrete data source implementations
+  // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+  //  The reason why this ID is used to identify a job rather than a single task output file is
+  // that, speculative tasks must generate the same output file name as the original task.
+  private val uniqueWriteJobId = UUID.randomUUID()
+
   // This is only used on driver side.
   @transient private val jobContext: JobContext = job
 
@@ -298,6 +327,11 @@ private[sql] abstract class BaseWriterContainer(
     setupIDs(0, 0, 0)
     setupConf()
 
+    // This UUID is sent to executor side together with the serialized `Configuration` object within
+    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
+    // unique task output files.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+
     // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
     // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
     // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
@@ -425,15 +459,16 @@ private[sql] class DefaultWriterContainer(
       assert(writer != null, "OutputWriter instance should have been initialized")
       writer.close()
       super.commitTask()
-    } catch {
-      case cause: Throwable =>
-        super.abortTask()
-        throw new RuntimeException("Failed to commit task", cause)
+    } catch { case cause: Throwable =>
+      // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
+      // cause `abortTask()` to be invoked.
+      throw new RuntimeException("Failed to commit task", cause)
     }
   }
 
   override def abortTask(): Unit = {
     try {
+      // It's possible that the task fails before `writer` gets initialized
       if (writer != null) {
         writer.close()
       }
@@ -477,21 +512,25 @@ private[sql] class DynamicPartitionWriterContainer(
     })
   }
 
-  override def commitTask(): Unit = {
-    try {
+  private def clearOutputWriters(): Unit = {
+    if (outputWriters.nonEmpty) {
       outputWriters.values.foreach(_.close())
       outputWriters.clear()
+    }
+  }
+
+  override def commitTask(): Unit = {
+    try {
+      clearOutputWriters()
       super.commitTask()
     } catch { case cause: Throwable =>
-      super.abortTask()
       throw new RuntimeException("Failed to commit task", cause)
     }
   }
 
   override def abortTask(): Unit = {
     try {
-      outputWriters.values.foreach(_.close())
-      outputWriters.clear()
+      clearOutputWriters()
     } finally {
       super.abortTask()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 1e51173..e5e92e6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -33,7 +33,7 @@ private[orc] object OrcFileOperator extends Logging{
     val fspath = new Path(pathStr)
     val fs = fspath.getFileSystem(conf)
     val orcFiles = listOrcFiles(pathStr, conf)
-
+    logDebug(s"Creating ORC Reader from ${orcFiles.head}")
     // TODO Need to consider all files when schema evolution is taken into account.
     OrcFile.createReader(fs, orcFiles.head)
   }
@@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
     val reader = getFileReader(path, conf)
     val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
     val schema = readerInspector.getTypeName
+    logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
     HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
   }
 
@@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
   def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
     val origPath = new Path(pathStr)
     val fs = origPath.getFileSystem(conf)
-    val path = origPath.makeQualified(fs)
+    val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
     val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
       .filterNot(_.isDir)
       .map(_.getPath)
       .filterNot(_.getName.startsWith("_"))
       .filterNot(_.getName.startsWith("."))
 
-    if (paths == null || paths.size == 0) {
+    if (paths == null || paths.isEmpty) {
       throw new IllegalArgumentException(
         s"orcFileOperator: path $path does not have valid orc files matching the pattern")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4509428..101f2ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -104,8 +104,9 @@ private[orc] class OrcOutputWriter(
     recordWriterInstantiated = true
 
     val conf = context.getConfiguration
+    val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
     val partition = context.getTaskAttemptID.getTaskID.getId
-    val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
+    val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
 
     new OrcOutputFormat().getRecordWriter(
       new Path(path, filename).getFileSystem(conf),

http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 7c7afc8..e337744 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
 object TestHive
   extends TestHiveContext(
     new SparkContext(
-      "local[2]",
+      "local[32]",
       "TestSQLContext",
       new SparkConf()
         .set("spark.sql.test", "")

http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 82e08ca..433ea9b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -44,7 +44,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
     import org.apache.spark.sql.hive.test.TestHive.implicits._
 
     sparkContext
-      .makeRDD(1 to 10)
+      .makeRDD(1 to 100)
       .map(i => OrcData(i, s"part-$i"))
       .toDF()
       .registerTempTable(s"orc_temp_table")
@@ -70,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("create temporary orc table") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 10).map(i => Row(i, s"part-$i")))
+      (1 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source where intField > 5"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
-      (1 to 10).map(i => Row(1, s"part-$i")))
+      (1 to 100).map(i => Row(1, s"part-$i")))
   }
 
   test("create temporary orc table as") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 10).map(i => Row(i, s"part-$i")))
+      (1 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
-      (1 to 10).map(i => Row(1, s"part-$i")))
+      (1 to 100).map(i => Row(1, s"part-$i")))
   }
 
   test("appending insert") {
@@ -106,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
+      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
         Seq.fill(2)(Row(i, s"part-$i"))
       })
   }
@@ -119,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_as_source"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0f959b3..5d7cd16 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+    val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
     val split = context.getTaskAttemptID.getTaskID.getId
     val name = FileOutputFormat.getOutputName(context)
-    new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
+    new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
   }
 }
 
@@ -156,6 +157,7 @@ class CommitFailureTestRelation(
         context: TaskAttemptContext): OutputWriter = {
       new SimpleTextOutputWriter(path, context) {
         override def close(): Unit = {
+          super.close()
           sys.error("Intentional task commitment failure for testing purpose.")
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/451c8722/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 57c0525..5b5ed5a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -22,12 +22,12 @@ import java.io.File
 import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.{SparkException, SparkFunSuite}
 
 abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
   override val sqlContext: SQLContext = TestHive
@@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
   import sqlContext._
   import sqlContext.implicits._
 
-  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+  val dataSourceName: String
 
   val dataSchema =
     StructType(
@@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
       checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
     }
   }
+
+  // NOTE: This test suite is not super deterministic.  On nodes with only relatively few cores
+  // (4 or even 1), it's hard to reproduce the data loss issue.  But on nodes with for example 8 or
+  // more cores, the issue can be reproduced steadily.  Fortunately our Jenkins builder meets this
+  // requirement.  We probably want to move this test case to spark-integration-tests or spark-perf
+  // later.
+  test("SPARK-8406: Avoids name collision while writing Parquet files") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      sqlContext
+        .range(10000)
+        .repartition(250)
+        .write
+        .mode(SaveMode.Overwrite)
+        .format(dataSourceName)
+        .save(path)
+
+      assertResult(10000) {
+        sqlContext
+          .read
+          .format(dataSourceName)
+          .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
+          .load(path)
+          .count()
+      }
+    }
+  }
 }
 
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
 }
 
 class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
-  import TestHive.implicits._
-
   override val sqlContext = TestHive
 
+  // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
   val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
 
   test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
     withTempPath { file =>
-      val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
+      // Here we coalesce partition number to 1 to ensure that only a single task is issued.  This
+      // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
+      // directory while committing/aborting the job.  See SPARK-8513 for more details.
+      val df = sqlContext.range(0, 10).coalesce(1)
       intercept[SparkException] {
         df.write.format(dataSourceName).save(file.getCanonicalPath)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org