You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/19 18:37:05 UTC

spark git commit: [SPARK-13681][SPARK-14458][SPARK-14566][SQL] Add back once removed CommitFailureTestRelationSuite and SimpleTextHadoopFsRelationSuite

Repository: spark
Updated Branches:
  refs/heads/master 3d46d796a -> 5e360c93b


[SPARK-13681][SPARK-14458][SPARK-14566][SQL] Add back once removed CommitFailureTestRelationSuite and SimpleTextHadoopFsRelationSuite

## What changes were proposed in this pull request?

These test suites were removed while refactoring `HadoopFsRelation` related API. This PR brings them back.

This PR also fixes two regressions:

- SPARK-14458, which causes runtime error when saving partitioned tables using `FileFormat` data sources that are not able to infer their own schemata. This bug wasn't detected by any built-in data sources because all of them happen to have schema inference feature.

- SPARK-14566, which happens to be covered by SPARK-14458 and causes wrong query result or runtime error when
  - appending a Dataset `ds` to a persisted partitioned data source relation `t`, and
  - partition columns in `ds` don't all appear after data columns

## How was this patch tested?

`CommitFailureTestRelationSuite` uses a testing relation that always fails when committing write tasks to test write job cleanup.

`SimpleTextHadoopFsRelationSuite` uses a testing relation to test general `HadoopFsRelation` and `FileFormat` interfaces.

The two regressions are both covered by existing test cases.

Author: Cheng Lian <li...@databricks.com>

Closes #12179 from liancheng/spark-13681-commit-failure-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e360c93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e360c93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e360c93

Branch: refs/heads/master
Commit: 5e360c93bed9d4f9250cf79bbcebd8552557f548
Parents: 3d46d79
Author: Cheng Lian <li...@databricks.com>
Authored: Tue Apr 19 09:37:00 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Apr 19 09:37:00 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |  14 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  11 +-
 .../spark/sql/hive/execution/commands.scala     |   2 +-
 .../CommitFailureTestRelationSuite.scala        |  82 +++++++++
 .../sql/sources/CommitFailureTestSource.scala   |  67 ++++++++
 .../SimpleTextHadoopFsRelationSuite.scala       |  68 ++++++++
 .../spark/sql/sources/SimpleTextRelation.scala  | 167 +++++++++++++++++++
 7 files changed, 403 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5e360c93/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 10fde15..23a7071 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -310,7 +310,17 @@ case class DataSource(
 
         val fileCatalog: FileCatalog =
           new HDFSFileCatalog(sqlContext, options, globbedPaths, partitionSchema)
-        val dataSchema = userSpecifiedSchema.orElse {
+
+        val dataSchema = userSpecifiedSchema.map { schema =>
+          val equality =
+            if (sqlContext.conf.caseSensitiveAnalysis) {
+              org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+            } else {
+              org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+            }
+
+          StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
+        }.orElse {
           format.inferSchema(
             sqlContext,
             caseInsensitiveOptions,
@@ -318,7 +328,7 @@ case class DataSource(
         }.getOrElse {
           throw new AnalysisException(
             s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
-            "It must be specified manually")
+              "It must be specified manually")
         }
 
         val enrichedOptions =

http://git-wip-us.apache.org/repos/asf/spark/blob/5e360c93/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 697cf71..79fe23b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -504,11 +504,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
     }
   }
 
-  private def convertToLogicalRelation(metastoreRelation: MetastoreRelation,
-                                       options: Map[String, String],
-                                       defaultSource: FileFormat,
-                                       fileFormatClass: Class[_ <: FileFormat],
-                                       fileType: String): LogicalRelation = {
+  private def convertToLogicalRelation(
+      metastoreRelation: MetastoreRelation,
+      options: Map[String, String],
+      defaultSource: FileFormat,
+      fileFormatClass: Class[_ <: FileFormat],
+      fileType: String): LogicalRelation = {
     val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
     val tableIdentifier =
       QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

http://git-wip-us.apache.org/repos/asf/spark/blob/5e360c93/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 5ef502a..8f7c4e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -300,7 +300,7 @@ case class CreateMetastoreDataSourceAsSelect(
     val data = Dataset.ofRows(hiveContext, query)
     val df = existingSchema match {
       // If we are inserting into an existing table, just use the existing schema.
-      case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s)
+      case Some(s) => data.selectExpr(s.fieldNames: _*)
       case None => data
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5e360c93/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
new file mode 100644
index 0000000..08e83b7
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton {
+  // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
+  val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
+
+  test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
+    withTempPath { file =>
+      // Here we coalesce partition number to 1 to ensure that only a single task is issued.  This
+      // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
+      // directory while committing/aborting the job.  See SPARK-8513 for more details.
+      val df = sqlContext.range(0, 10).coalesce(1)
+      intercept[SparkException] {
+        df.write.format(dataSourceName).save(file.getCanonicalPath)
+      }
+
+      val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+      assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+    }
+  }
+
+  test("call failure callbacks before close writer - default") {
+    SimpleTextRelation.failCommitter = false
+    withTempPath { file =>
+      // fail the job in the middle of writing
+      val divideByZero = udf((x: Int) => { x / (x - 1)})
+      val df = sqlContext.range(0, 10).coalesce(1).select(divideByZero(col("id")))
+
+      SimpleTextRelation.callbackCalled = false
+      intercept[SparkException] {
+        df.write.format(dataSourceName).save(file.getCanonicalPath)
+      }
+      assert(SimpleTextRelation.callbackCalled, "failure callback should be called")
+
+      val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+      assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+    }
+  }
+
+  test("call failure callbacks before close writer - partitioned") {
+    SimpleTextRelation.failCommitter = false
+    withTempPath { file =>
+      // fail the job in the middle of writing
+      val df = sqlContext.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id"))
+
+      SimpleTextRelation.callbackCalled = false
+      SimpleTextRelation.failWriter = true
+      intercept[SparkException] {
+        df.write.format(dataSourceName).partitionBy("key").save(file.getCanonicalPath)
+      }
+      assert(SimpleTextRelation.callbackCalled, "failure callback should be called")
+
+      val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+      assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e360c93/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
new file mode 100644
index 0000000..6d7e7b7
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.types.StructType
+
+class CommitFailureTestSource extends SimpleTextSource {
+  /**
+   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
+   * be put here.  For example, user defined output committer can be configured here
+   * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+   */
+  override def prepareWrite(
+      sqlContext: SQLContext,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory =
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          bucketId: Option[Int],
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new SimpleTextOutputWriter(path, context) {
+          var failed = false
+          TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
+            failed = true
+            SimpleTextRelation.callbackCalled = true
+          }
+
+          override def write(row: Row): Unit = {
+            if (SimpleTextRelation.failWriter) {
+              sys.error("Intentional task writer failure for testing purpose.")
+
+            }
+            super.write(row)
+          }
+
+          override def close(): Unit = {
+            super.close()
+            sys.error("Intentional task commitment failure for testing purpose.")
+          }
+        }
+      }
+    }
+
+  override def shortName(): String = "commit-failure-test"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e360c93/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
new file mode 100644
index 0000000..71e3457
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.types._
+
+class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper {
+  override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
+
+  // We have a very limited number of supported types at here since it is just for a
+  // test relation and we do very basic testing at here.
+  override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: BinaryType => false
+    // We are using random data generator and the generated strings are not really valid string.
+    case _: StringType => false
+    case _: BooleanType => false // see https://issues.apache.org/jira/browse/SPARK-10442
+    case _: CalendarIntervalType => false
+    case _: DateType => false
+    case _: TimestampType => false
+    case _: ArrayType => false
+    case _: MapType => false
+    case _: StructType => false
+    case _: UserDefinedType[_] => false
+    case _ => true
+  }
+
+  test("save()/load() - partitioned table - simple queries - partition columns in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+          .saveAsTextFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+      checkQueries(
+        hiveContext.read.format(dataSourceName)
+          .option("dataSchema", dataSchemaWithPartition.json)
+          .load(file.getCanonicalPath))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e360c93/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
new file mode 100644
index 0000000..113b124
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.text.NumberFormat
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+
+import org.apache.spark.sql.{sources, Row, SQLContext}
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+class SimpleTextSource extends FileFormat with DataSourceRegister {
+  override def shortName(): String = "test"
+
+  override def inferSchema(
+      sqlContext: SQLContext,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    Some(DataType.fromJson(options("dataSchema")).asInstanceOf[StructType])
+  }
+
+  override def prepareWrite(
+      sqlContext: SQLContext,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory {
+    override private[sql] def newInstance(
+        path: String,
+        bucketId: Option[Int],
+        dataSchema: StructType,
+        context: TaskAttemptContext): OutputWriter = {
+      new SimpleTextOutputWriter(path, context)
+    }
+  }
+
+  override def buildReader(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+
+    SimpleTextRelation.requiredColumns = requiredSchema.fieldNames
+    SimpleTextRelation.pushedFilters = filters.toSet
+
+    val fieldTypes = dataSchema.map(_.dataType)
+    val inputAttributes = dataSchema.toAttributes
+    val outputAttributes = requiredSchema.flatMap { field =>
+      inputAttributes.find(_.name == field.name)
+    }
+
+    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val broadcastedConf =
+      sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+    (file: PartitionedFile) => {
+      val predicate = {
+        val filterCondition: Expression = filters.collect {
+          // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter
+          case sources.GreaterThan(column, value) =>
+            val dataType = dataSchema(column).dataType
+            val literal = Literal.create(value, dataType)
+            val attribute = inputAttributes.find(_.name == column).get
+            expressions.GreaterThan(attribute, literal)
+        }.reduceOption(expressions.And).getOrElse(Literal(true))
+        InterpretedPredicate.create(filterCondition, inputAttributes)
+      }
+
+      // Uses a simple projection to simulate column pruning
+      val projection = new InterpretedProjection(outputAttributes, inputAttributes)
+
+      val unsafeRowIterator =
+        new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+          val record = line.toString
+          new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map {
+            case (v, dataType) =>
+              val value = if (v == "") null else v
+              // `Cast`ed values are always of internal types (e.g. UTF8String instead of String)
+              Cast(Literal(value), dataType).eval()
+          })
+        }.filter(predicate).map(projection)
+
+      // Appends partition values
+      val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+      val joinedRow = new JoinedRow()
+      val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
+
+      unsafeRowIterator.map { dataRow =>
+        appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
+      }
+    }
+  }
+}
+
+class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+  private val recordWriter: RecordWriter[NullWritable, Text] =
+    new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+
+  override def write(row: Row): Unit = {
+    val serialized = row.toSeq.map { v =>
+      if (v == null) "" else v.toString
+    }.mkString(",")
+    recordWriter.write(null, new Text(serialized))
+  }
+
+  override def close(): Unit = {
+    recordWriter.close(context)
+  }
+}
+
+class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] {
+  val numberFormat = NumberFormat.getInstance()
+
+  numberFormat.setMinimumIntegerDigits(5)
+  numberFormat.setGroupingUsed(false)
+
+  override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+    val configuration = context.getConfiguration
+    val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+    val taskAttemptId = context.getTaskAttemptID
+    val split = taskAttemptId.getTaskID.getId
+    val name = FileOutputFormat.getOutputName(context)
+    new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
+  }
+}
+
+object SimpleTextRelation {
+  // Used to test column pruning
+  var requiredColumns: Seq[String] = Nil
+
+  // Used to test filter push-down
+  var pushedFilters: Set[Filter] = Set.empty
+
+  // Used to test failed committer
+  var failCommitter = false
+
+  // Used to test failed writer
+  var failWriter = false
+
+  // Used to test failure callback
+  var callbackCalled = false
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org