You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/22 19:04:02 UTC

spark git commit: [SPARK-8406] [SQL] Adding UUID to output file name to avoid accidental overwriting

Repository: spark
Updated Branches:
  refs/heads/master 47c1d5629 -> 0818fdec3


[SPARK-8406] [SQL] Adding UUID to output file name to avoid accidental overwriting

This PR fixes a Parquet output file name collision bug which may cause data loss.  Changes made:

1.  Identify each write job issued by `InsertIntoHadoopFsRelation` with a UUID

    All concrete data sources which extend `HadoopFsRelation` (Parquet and ORC for now) must use this UUID to generate task output file path to avoid name collision.

2.  Make `TestHive` use a local mode `SparkContext` with 32 threads to increase parallelism

    The major reason for this is that, the original parallelism of 2 is too low to reproduce the data loss issue.  Also, higher concurrency may potentially caught more concurrency bugs during testing phase. (It did help us spotted SPARK-8501.)

3. `OrcSourceSuite` was updated to workaround SPARK-8501, which we detected along the way.

NOTE: This PR is made a little bit more complicated than expected because we hit two other bugs on the way and have to work them around. See [SPARK-8501] [1] and [SPARK-8513] [2].

[1]: https://github.com/liancheng/spark/tree/spark-8501
[2]: https://github.com/liancheng/spark/tree/spark-8513

----

Some background and a summary of offline discussion with yhuai about this issue for better understanding:

In 1.4.0, we added `HadoopFsRelation` to abstract partition support of all data sources that are based on Hadoop `FileSystem` interface.  Specifically, this makes partition discovery, partition pruning, and writing dynamic partitions for data sources much easier.

To support appending, the Parquet data source tries to find out the max part number of part-files in the destination directory (i.e., `<id>` in output file name `part-r-<id>.gz.parquet`) at the beginning of the write job.  In 1.3.0, this step happens on driver side before any files are written.  However, in 1.4.0, this is moved to task side.  Unfortunately, for tasks scheduled later, they may see wrong max part number generated of files newly written by other finished tasks within the same job.  This actually causes a race condition.  In most cases, this only causes nonconsecutive part numbers in output file names.  But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same part number, then one of them gets overwritten by the other.

Before `HadoopFsRelation`, Spark SQL already supports appending data to Hive tables.  From a user's perspective, these two look similar.  However, they differ a lot internally.  When data are inserted into Hive tables via Spark SQL, `InsertIntoHiveTable` simulates Hive's behaviors:

1.  Write data to a temporary location

2.  Move data in the temporary location to the final destination location using

    -   `Hive.loadTable()` for non-partitioned table
    -   `Hive.loadPartition()` for static partitions
    -   `Hive.loadDynamicPartitions()` for dynamic partitions

The important part is that, `Hive.copyFiles()` is invoked in step 2 to move the data to the destination directory (I found the name is kinda confusing since no "copying" occurs here, we are just moving and renaming stuff).  If a file in the source directory and another file in the destination directory happen to have the same name, say `part-r-00001.parquet`, the former is moved to the destination directory and renamed with a `_copy_N` postfix (`part-r-00001_copy_1.parquet`).  That's how Hive handles appending and avoids name collision between different write jobs.

Some alternatives fixes considered for this issue:

1.  Use a similar approach as Hive

    This approach is not preferred in Spark 1.4.0 mainly because file metadata operations in S3 tend to be slow, especially for tables with lots of file and/or partitions.  That's why `InsertIntoHadoopFsRelation` just inserts to destination directory directly, and is often used together with `DirectParquetOutputCommitter` to reduce latency when working with S3.  This means, we don't have the chance to do renaming, and must avoid name collision from the very beginning.

2.  Same as 1.3, just move max part number detection back to driver side

    This isn't doable because unlike 1.3, 1.4 also takes dynamic partitioning into account.  When inserting into dynamic partitions, we don't know which partition directories will be touched on driver side before issuing the write job.  Checking all partition directories is simply too expensive for tables with thousands of partitions.

3.  Add extra component to output file names to avoid name collision

    This seems to be the only reasonable solution for now.  To be more specific, we need a JOB level unique identifier to identify all write jobs issued by `InsertIntoHadoopFile`.  Notice that TASK level unique identifiers can NOT be used.  Because in this way a speculative task will write to a different output file from the original task.  If both tasks succeed, duplicate output will be left behind.  Currently, the ORC data source adds `System.currentTimeMillis` to the output file name for uniqueness.  This doesn't work because of exactly the same reason.

    That's why this PR adds a job level random UUID in `BaseWriterContainer` (which is used by `InsertIntoHadoopFsRelation` to issue write jobs).  The drawback is that record order is not preserved any more (output files of a later job may be listed before those of a earlier job).  However, we never promise to preserve record order when writing data, and Hive doesn't promise this either because the `_copy_N` trick breaks the order.

Author: Cheng Lian <li...@databricks.com>

Closes #6864 from liancheng/spark-8406 and squashes the following commits:

db7a46a [Cheng Lian] More comments
f5c1133 [Cheng Lian] Addresses comments
85c478e [Cheng Lian] Workarounds SPARK-8513
088c76c [Cheng Lian] Adds comment about SPARK-8501
99a5e7e [Cheng Lian] Uses job level UUID in SimpleTextRelation and avoids double task abortion
4088226 [Cheng Lian] Works around SPARK-8501
1d7d206 [Cheng Lian] Adds more logs
8966bbb [Cheng Lian] Fixes Scala style issue
18b7003 [Cheng Lian] Uses job level UUID to take speculative tasks into account
3806190 [Cheng Lian] Lets TestHive use all cores by default
748dbd7 [Cheng Lian] Adding UUID to output file name to avoid accidental overwriting


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0818fdec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0818fdec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0818fdec

Branch: refs/heads/master
Commit: 0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13
Parents: 47c1d56
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Jun 22 10:03:57 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jun 22 10:03:57 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 43 +++----------
 .../org/apache/spark/sql/sources/commands.scala | 64 ++++++++++++++++----
 .../spark/sql/hive/orc/OrcFileOperator.scala    |  9 +--
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  5 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 .../spark/sql/hive/orc/OrcSourceSuite.scala     | 28 +++++----
 .../spark/sql/sources/SimpleTextRelation.scala  |  4 +-
 .../sql/sources/hadoopFsRelationSuites.scala    | 37 +++++++++--
 8 files changed, 120 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c9de45e..e049d54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider {
   override def createRelation(
@@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
   extends OutputWriter {
 
   private val recordWriter: RecordWriter[Void, InternalRow] = {
-    val conf = context.getConfiguration
     val outputFormat = {
-      // When appending new Parquet files to an existing Parquet file directory, to avoid
-      // overwriting existing data files, we need to find out the max task ID encoded in these data
-      // file names.
-      // TODO Make this snippet a utility function for other data source developers
-      val maxExistingTaskId = {
-        // Note that `path` may point to a temporary location.  Here we retrieve the real
-        // destination path from the configuration
-        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
-        val fs = outputPath.getFileSystem(conf)
-
-        if (fs.exists(outputPath)) {
-          // Pattern used to match task ID in part file names, e.g.:
-          //
-          //   part-r-00001.gz.parquet
-          //          ^~~~~
-          val partFilePattern = """part-.-(\d{1,}).*""".r
-
-          fs.listStatus(outputPath).map(_.getPath.getName).map {
-            case partFilePattern(id) => id.toInt
-            case name if name.startsWith("_") => 0
-            case name if name.startsWith(".") => 0
-            case name => throw new AnalysisException(
-              s"Trying to write Parquet files to directory $outputPath, " +
-                s"but found items with illegal name '$name'.")
-          }.reduceOption(_ max _).getOrElse(0)
-        } else {
-          0
-        }
-      }
-
       new ParquetOutputFormat[InternalRow]() {
         // Here we override `getDefaultWorkFile` for two reasons:
         //
-        //  1. To allow appending.  We need to generate output file name based on the max available
-        //     task ID computed above.
+        //  1. To allow appending.  We need to generate unique output file names to avoid
+        //     overwriting existing files (either exist before the write job, or are just written
+        //     by other tasks within the same write job).
         //
         //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
         //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
         //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-          val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
-          new Path(path, f"part-r-$split%05d$extension")
+          val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+          val split = context.getTaskAttemptID.getTaskID.getId
+          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c16bd9a..215e53c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.sql.sources
 
-import java.util.Date
+import java.util.{Date, UUID}
 
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
-import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
 
 import org.apache.spark._
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
@@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource(
   }
 }
 
+/**
+ * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job.  Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file.  This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ *   1. Driver side setup, including output committer initialization and data source specific
+ *      preparation work for the write job to be issued.
+ *   2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ *      rows within an RDD partition.
+ *   3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
+ *      exception is thrown during task commitment, also aborts that task.
+ *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
+ *      thrown during job commitment, also aborts the job.
+ */
 private[sql] case class InsertIntoHadoopFsRelation(
     @transient relation: HadoopFsRelation,
     @transient query: LogicalPlan,
@@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer(
   with Logging
   with Serializable {
 
-  protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
+  protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+
+  // This UUID is used to avoid output file name collision between different appending write jobs.
+  // These jobs may belong to different SparkContext instances. Concrete data source implementations
+  // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+  //  The reason why this ID is used to identify a job rather than a single task output file is
+  // that, speculative tasks must generate the same output file name as the original task.
+  private val uniqueWriteJobId = UUID.randomUUID()
 
   // This is only used on driver side.
   @transient private val jobContext: JobContext = job
@@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer(
     setupIDs(0, 0, 0)
     setupConf()
 
+    // This UUID is sent to executor side together with the serialized `Configuration` object within
+    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
+    // unique task output files.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+
     // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
     // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
     // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
@@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer(
       assert(writer != null, "OutputWriter instance should have been initialized")
       writer.close()
       super.commitTask()
-    } catch {
-      case cause: Throwable =>
-        super.abortTask()
-        throw new RuntimeException("Failed to commit task", cause)
+    } catch { case cause: Throwable =>
+      // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
+      // cause `abortTask()` to be invoked.
+      throw new RuntimeException("Failed to commit task", cause)
     }
   }
 
   override def abortTask(): Unit = {
     try {
+      // It's possible that the task fails before `writer` gets initialized
       if (writer != null) {
         writer.close()
       }
@@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer(
     })
   }
 
-  override def commitTask(): Unit = {
-    try {
+  private def clearOutputWriters(): Unit = {
+    if (outputWriters.nonEmpty) {
       outputWriters.values.foreach(_.close())
       outputWriters.clear()
+    }
+  }
+
+  override def commitTask(): Unit = {
+    try {
+      clearOutputWriters()
       super.commitTask()
     } catch { case cause: Throwable =>
-      super.abortTask()
       throw new RuntimeException("Failed to commit task", cause)
     }
   }
 
   override def abortTask(): Unit = {
     try {
-      outputWriters.values.foreach(_.close())
-      outputWriters.clear()
+      clearOutputWriters()
     } finally {
       super.abortTask()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 1e51173..e3ab944 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.hive.HiveMetastoreTypes
 import org.apache.spark.sql.types.StructType
 
-private[orc] object OrcFileOperator extends Logging{
+private[orc] object OrcFileOperator extends Logging {
   def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
     val conf = config.getOrElse(new Configuration)
     val fspath = new Path(pathStr)
     val fs = fspath.getFileSystem(conf)
     val orcFiles = listOrcFiles(pathStr, conf)
-
+    logDebug(s"Creating ORC Reader from ${orcFiles.head}")
     // TODO Need to consider all files when schema evolution is taken into account.
     OrcFile.createReader(fs, orcFiles.head)
   }
@@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
     val reader = getFileReader(path, conf)
     val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
     val schema = readerInspector.getTypeName
+    logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
     HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
   }
 
@@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
   def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
     val origPath = new Path(pathStr)
     val fs = origPath.getFileSystem(conf)
-    val path = origPath.makeQualified(fs)
+    val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
     val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
       .filterNot(_.isDir)
       .map(_.getPath)
       .filterNot(_.getName.startsWith("_"))
       .filterNot(_.getName.startsWith("."))
 
-    if (paths == null || paths.size == 0) {
+    if (paths == null || paths.isEmpty) {
       throw new IllegalArgumentException(
         s"orcFileOperator: path $path does not have valid orc files matching the pattern")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index dbce39f..705f48f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Reco
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
+import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.{Logging}
 import org.apache.spark.util.SerializableConfiguration
 
 /* Implicit conversions */
@@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter(
     recordWriterInstantiated = true
 
     val conf = context.getConfiguration
+    val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
     val partition = context.getTaskAttemptID.getTaskID.getId
-    val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
+    val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
 
     new OrcOutputFormat().getRecordWriter(
       new Path(path, filename).getFileSystem(conf),

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index f901bd8..ea325cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
 object TestHive
   extends TestHiveContext(
     new SparkContext(
-      System.getProperty("spark.sql.test.master", "local[2]"),
+      System.getProperty("spark.sql.test.master", "local[32]"),
       "TestSQLContext",
       new SparkConf()
         .set("spark.sql.test", "")

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 82e08ca..a0cdd0d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -43,8 +43,14 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
     orcTableDir.mkdir()
     import org.apache.spark.sql.hive.test.TestHive.implicits._
 
+    // Originally we were using a 10-row RDD for testing.  However, when default parallelism is
+    // greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
+    // which result in empty ORC files.  Unfortunately, ORC doesn't handle empty files properly and
+    // causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
+    // for more details.  To workaround this issue before fixing SPARK-8501, we simply increase row
+    // number in this RDD to avoid empty partitions.
     sparkContext
-      .makeRDD(1 to 10)
+      .makeRDD(1 to 100)
       .map(i => OrcData(i, s"part-$i"))
       .toDF()
       .registerTempTable(s"orc_temp_table")
@@ -70,35 +76,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("create temporary orc table") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 10).map(i => Row(i, s"part-$i")))
+      (1 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source where intField > 5"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
-      (1 to 10).map(i => Row(1, s"part-$i")))
+      (1 to 100).map(i => Row(1, s"part-$i")))
   }
 
   test("create temporary orc table as") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 10).map(i => Row(i, s"part-$i")))
+      (1 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
-      (1 to 10).map(i => Row(1, s"part-$i")))
+      (1 to 100).map(i => Row(1, s"part-$i")))
   }
 
   test("appending insert") {
@@ -106,7 +112,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
+      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
         Seq.fill(2)(Row(i, s"part-$i"))
       })
   }
@@ -119,7 +125,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_as_source"),
-      (6 to 10).map(i => Row(i, s"part-$i")))
+      (6 to 100).map(i => Row(i, s"part-$i")))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0f959b3..5d7cd16 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+    val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
     val split = context.getTaskAttemptID.getTaskID.getId
     val name = FileOutputFormat.getOutputName(context)
-    new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
+    new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
   }
 }
 
@@ -156,6 +157,7 @@ class CommitFailureTestRelation(
         context: TaskAttemptContext): OutputWriter = {
       new SimpleTextOutputWriter(path, context) {
         override def close(): Unit = {
+          super.close()
           sys.error("Intentional task commitment failure for testing purpose.")
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 76469d7..e0d8277 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
   import sqlContext.sql
   import sqlContext.implicits._
 
-  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+  val dataSourceName: String
 
   val dataSchema =
     StructType(
@@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
       checkAnswer(sqlContext.table("t"), df.select('b, 'c, 'a).collect())
     }
   }
+
+  // NOTE: This test suite is not super deterministic.  On nodes with only relatively few cores
+  // (4 or even 1), it's hard to reproduce the data loss issue.  But on nodes with for example 8 or
+  // more cores, the issue can be reproduced steadily.  Fortunately our Jenkins builder meets this
+  // requirement.  We probably want to move this test case to spark-integration-tests or spark-perf
+  // later.
+  test("SPARK-8406: Avoids name collision while writing Parquet files") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      sqlContext
+        .range(10000)
+        .repartition(250)
+        .write
+        .mode(SaveMode.Overwrite)
+        .format(dataSourceName)
+        .save(path)
+
+      assertResult(10000) {
+        sqlContext
+          .read
+          .format(dataSourceName)
+          .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
+          .load(path)
+          .count()
+      }
+    }
+  }
 }
 
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
 }
 
 class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
-  import TestHive.implicits._
-
   override val sqlContext = TestHive
 
+  // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
   val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
 
   test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
     withTempPath { file =>
-      val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
+      // Here we coalesce partition number to 1 to ensure that only a single task is issued.  This
+      // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
+      // directory while committing/aborting the job.  See SPARK-8513 for more details.
+      val df = sqlContext.range(0, 10).coalesce(1)
       intercept[SparkException] {
         df.write.format(dataSourceName).save(file.getCanonicalPath)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org